kfaraz commented on code in PR #19157:
URL: https://github.com/apache/druid/pull/19157#discussion_r2945505590
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}
+ private void waitSegmentsAvailableInBroker()
+ {
+ Set<SegmentId> segments = overlord
+ .bindings()
+ .segmentsMetadataStorage()
+ .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+ .stream()
+ .map(DataSegment::getId)
+ .collect(Collectors.toSet());
+
+ ITRetryUtil.retryUntilEquals(
Review Comment:
Instead of `ITRetryUtil`, try using `cluster.callApi().waitForResult()`.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -310,45 +308,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec
partitionsSpec) throws Ex
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));
- // performed minor compaction: 1 previously compacted segment + 1
incrementally compacted segment
+ // performed minor compaction: 1 previously compacted segment + 1 recently
compacted segment from minor compaction
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
- Assertions.assertEquals(3000, getTotalRowCount());
+ Assertions.assertEquals(4000, getTotalRowCount());
}
- protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
- {
- indexer.latchableEmitter().waitForEventAggregate(
- event -> event.hasMetricName("ingest/events/processed")
- .hasDimension(DruidMetrics.DATASOURCE, dataSource),
- agg -> agg.hasSumAtLeast(expectedRowCount)
- );
-
- final int totalEventsProcessed = indexer
- .latchableEmitter()
- .getMetricValues("ingest/events/processed",
Map.of(DruidMetrics.DATASOURCE, dataSource))
- .stream()
- .mapToInt(Number::intValue)
- .sum();
- Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
- }
-
- protected int publish1kRecords(String topic, boolean useTransactions)
+ protected void ingest1kRecords()
{
final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
- final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
- List<byte[]> records = streamGenerator.generateEvents(10);
-
- ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new
ArrayList<>();
- for (byte[] record : records) {
- producerRecords.add(new ProducerRecord<>(topic, record));
- }
-
- if (useTransactions) {
- kafkaServer.produceRecordsToTopic(producerRecords);
- } else {
- kafkaServer.produceRecordsWithoutTransaction(producerRecords);
- }
- return producerRecords.size();
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
Review Comment:
Is the large number of records crucial for this test?
If not, you could try using some of the templates from `MoreResources` such
as `MoreResources.Task.BASIC_INDEX`,
`MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS` or
`MoreResources.MSQ.INSERT_TINY_WIKI_JSON`.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}
+ private void waitSegmentsAvailableInBroker()
+ {
+ Set<SegmentId> segments = overlord
+ .bindings()
+ .segmentsMetadataStorage()
+ .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+ .stream()
+ .map(DataSegment::getId)
+ .collect(Collectors.toSet());
+
+ ITRetryUtil.retryUntilEquals(
Review Comment:
We should wait for a Broker metric instead.
Does `cluster.callApi().waitForSegmentsToBeAvailable()` not work for this?
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -310,45 +308,44 @@ public void test_minorCompactionWithMSQ(PartitionsSpec
partitionsSpec) throws Ex
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));
- // performed minor compaction: 1 previously compacted segment + 1
incrementally compacted segment
+ // performed minor compaction: 1 previously compacted segment + 1 recently
compacted segment from minor compaction
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
- Assertions.assertEquals(3000, getTotalRowCount());
+ Assertions.assertEquals(4000, getTotalRowCount());
}
- protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
- {
- indexer.latchableEmitter().waitForEventAggregate(
- event -> event.hasMetricName("ingest/events/processed")
- .hasDimension(DruidMetrics.DATASOURCE, dataSource),
- agg -> agg.hasSumAtLeast(expectedRowCount)
- );
-
- final int totalEventsProcessed = indexer
- .latchableEmitter()
- .getMetricValues("ingest/events/processed",
Map.of(DruidMetrics.DATASOURCE, dataSource))
- .stream()
- .mapToInt(Number::intValue)
- .sum();
- Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
- }
-
- protected int publish1kRecords(String topic, boolean useTransactions)
+ protected void ingest1kRecords()
{
final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
- final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
- List<byte[]> records = streamGenerator.generateEvents(10);
-
- ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new
ArrayList<>();
- for (byte[] record : records) {
- producerRecords.add(new ProducerRecord<>(topic, record));
- }
-
- if (useTransactions) {
- kafkaServer.produceRecordsToTopic(producerRecords);
- } else {
- kafkaServer.produceRecordsWithoutTransaction(producerRecords);
- }
- return producerRecords.size();
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
Review Comment:
For a large dataset (wikipedia 1 day = 24k rows), you could also try the
following (from `IngestionSmokeTest.test_runIndexParallelTask_andCompactData()`)
```java
final String taskId = IdUtils.getRandomId();
final ParallelIndexSupervisorTask task = TaskBuilder
.ofTypeIndexParallel()
.timestampColumn("timestamp")
.jsonInputFormat()
.inputSource(Resources.HttpData.wikipedia1Day())
.dimensions()
.tuningConfig(t -> t.withMaxNumConcurrentSubTasks(1))
.dataSource(dataSource)
.withId(taskId);
cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
cluster.callApi().waitForTaskToSucceed(taskId,
eventCollector.latchableEmitter());
```
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java:
##########
@@ -842,6 +839,32 @@ private int getNumSegmentsWith(Granularity granularity)
.count();
}
+ private void waitSegmentsAvailableInBroker()
+ {
+ Set<SegmentId> segments = overlord
+ .bindings()
+ .segmentsMetadataStorage()
+ .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+ .stream()
+ .map(DataSegment::getId)
+ .collect(Collectors.toSet());
+
+ ITRetryUtil.retryUntilEquals(
+ () ->
+ broker.bindings()
+ .getInstance(BrokerServerView.class)
+ .getTimeline(TableDataSource.create(dataSource))
Review Comment:
Instead of querying the timeline directly, please use `SELECT id FROM
sys.segments`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]