kfaraz commented on code in PR #19157:
URL: https://github.com/apache/druid/pull/19157#discussion_r2945505590


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
         .count();
   }
 
+  private void waitSegmentsAvailableInBroker()
+  {
+    Set<SegmentId> segments = overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        .stream()
+        .map(DataSegment::getId)
+        .collect(Collectors.toSet());
+
+    ITRetryUtil.retryUntilEquals(

Review Comment:
   Instead of `ITRetryUtil`, try using `cluster.callApi().waitForResult()`.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -310,45 +308,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec 
partitionsSpec) throws Ex
                       .hasDimension(DruidMetrics.DATASOURCE, dataSource)
                       .hasValueMatching(Matchers.greaterThan(totalUsed)));
 
-    // performed minor compaction: 1 previously compacted segment + 1 
incrementally compacted segment
+    // performed minor compaction: 1 previously compacted segment + 1 recently 
compacted segment from minor compaction
     Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
-    Assertions.assertEquals(3000, getTotalRowCount());
+    Assertions.assertEquals(4000, getTotalRowCount());
   }
 
-  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
-  {
-    indexer.latchableEmitter().waitForEventAggregate(
-        event -> event.hasMetricName("ingest/events/processed")
-                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
-        agg -> agg.hasSumAtLeast(expectedRowCount)
-    );
-
-    final int totalEventsProcessed = indexer
-        .latchableEmitter()
-        .getMetricValues("ingest/events/processed", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
-        .stream()
-        .mapToInt(Number::intValue)
-        .sum();
-    Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
-  }
-
-  protected int publish1kRecords(String topic, boolean useTransactions)
+  protected void ingest1kRecords()
   {
     final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
-    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
-    List<byte[]> records = streamGenerator.generateEvents(10);
-
-    ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new 
ArrayList<>();
-    for (byte[] record : records) {
-      producerRecords.add(new ProducerRecord<>(topic, record));
-    }
-
-    if (useTransactions) {
-      kafkaServer.produceRecordsToTopic(producerRecords);
-    } else {
-      kafkaServer.produceRecordsWithoutTransaction(producerRecords);
-    }
-    return producerRecords.size();
+    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 500, 100);

Review Comment:
   Is the large number of records crucial for this test?
   If not, you could try using some of the templates from `MoreResources` such 
as `MoreResources.Task.BASIC_INDEX`, 
`MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS` or 
`MoreResources.MSQ.INSERT_TINY_WIKI_JSON`.



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
         .count();
   }
 
+  private void waitSegmentsAvailableInBroker()
+  {
+    Set<SegmentId> segments = overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        .stream()
+        .map(DataSegment::getId)
+        .collect(Collectors.toSet());
+
+    ITRetryUtil.retryUntilEquals(

Review Comment:
   We should wait for a Broker metric instead.
   Does `cluster.callApi().waitForSegmentsToBeAvailable()` not work for this?



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -310,45 +308,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec 
partitionsSpec) throws Ex
                       .hasDimension(DruidMetrics.DATASOURCE, dataSource)
                       .hasValueMatching(Matchers.greaterThan(totalUsed)));
 
-    // performed minor compaction: 1 previously compacted segment + 1 
incrementally compacted segment
+    // performed minor compaction: 1 previously compacted segment + 1 recently 
compacted segment from minor compaction
     Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
-    Assertions.assertEquals(3000, getTotalRowCount());
+    Assertions.assertEquals(4000, getTotalRowCount());
   }
 
-  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
-  {
-    indexer.latchableEmitter().waitForEventAggregate(
-        event -> event.hasMetricName("ingest/events/processed")
-                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
-        agg -> agg.hasSumAtLeast(expectedRowCount)
-    );
-
-    final int totalEventsProcessed = indexer
-        .latchableEmitter()
-        .getMetricValues("ingest/events/processed", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
-        .stream()
-        .mapToInt(Number::intValue)
-        .sum();
-    Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
-  }
-
-  protected int publish1kRecords(String topic, boolean useTransactions)
+  protected void ingest1kRecords()
   {
     final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
-    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
-    List<byte[]> records = streamGenerator.generateEvents(10);
-
-    ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new 
ArrayList<>();
-    for (byte[] record : records) {
-      producerRecords.add(new ProducerRecord<>(topic, record));
-    }
-
-    if (useTransactions) {
-      kafkaServer.produceRecordsToTopic(producerRecords);
-    } else {
-      kafkaServer.produceRecordsWithoutTransaction(producerRecords);
-    }
-    return producerRecords.size();
+    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 500, 100);

Review Comment:
   For a large dataset (wikipedia 1 day = 24k rows), you could also try the 
following (from `IngestionSmokeTest.test_runIndexParallelTask_andCompactData()`)
   
   ```java
   final String taskId = IdUtils.getRandomId();
       final ParallelIndexSupervisorTask task = TaskBuilder
           .ofTypeIndexParallel()
           .timestampColumn("timestamp")
           .jsonInputFormat()
           .inputSource(Resources.HttpData.wikipedia1Day())
           .dimensions()
           .tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
           .dataSource(dataSource)
           .withId(taskId);
       cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
       cluster.callApi().waitForTaskToSucceed(taskId, 
eventCollector.latchableEmitter());
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
         .count();
   }
 
+  private void waitSegmentsAvailableInBroker()
+  {
+    Set<SegmentId> segments = overlord
+        .bindings()
+        .segmentsMetadataStorage()
+        .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        .stream()
+        .map(DataSegment::getId)
+        .collect(Collectors.toSet());
+
+    ITRetryUtil.retryUntilEquals(
+        () ->
+            broker.bindings()
+                  .getInstance(BrokerServerView.class)
+                  .getTimeline(TableDataSource.create(dataSource))

Review Comment:
   Instead of querying the timeline directly, please use `SELECT id FROM 
sys.segments`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to