This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d5abd06b96 Fix flaky KafkaIndexTaskTest. (#12657)
d5abd06b96 is described below

commit d5abd06b9679d0223b8362855de5cb064ad76ef0
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Jun 24 13:53:51 2022 -0700

    Fix flaky KafkaIndexTaskTest. (#12657)
    
    * Fix flaky KafkaIndexTaskTest.
    
    The testRunTransactionModeRollback case had many race conditions. Most 
notably,
    it would commit a transaction and then immediately check to see that the 
results
    were *not* indexed. This is racey because it relied on the indexing thread 
being
    slower than the test thread.
    
    Now, the case waits for the transaction to be processed by the indexing 
thread
    before checking the results.
    
    * Changes from review.
---
 .../druid/indexing/kafka/KafkaIndexTask.java       |   2 -
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 174 +++++++++++++--------
 .../SeekableStreamIndexTaskTestBase.java           |   8 +-
 3 files changed, 110 insertions(+), 74 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 623379aae5..e24072e5a9 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -38,7 +38,6 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<Integer, Long, Kafka
 {
   private static final String TYPE = "index_kafka";
 
-  private final KafkaIndexTaskIOConfig ioConfig;
   private final ObjectMapper configMapper;
 
   // This value can be tuned in some tests
@@ -65,7 +64,6 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<Integer, Long, Kafka
         getFormattedGroupId(dataSchema.getDataSource(), TYPE)
     );
     this.configMapper = configMapper;
-    this.ioConfig = ioConfig;
 
     Preconditions.checkArgument(
         ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2067db25ef..afe9c68efe 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -96,6 +96,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -124,7 +125,6 @@ import org.apache.druid.query.scan.ScanQueryConfig;
 import org.apache.druid.query.scan.ScanQueryEngine;
 import org.apache.druid.query.scan.ScanQueryQueryToolChest;
 import org.apache.druid.query.scan.ScanQueryRunnerFactory;
-import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -141,7 +141,6 @@ import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
@@ -181,6 +180,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -252,11 +252,9 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   private Long maxTotalRows = null;
   private Period intermediateHandoffPeriod = null;
 
-  private AppenderatorsManager appenderatorsManager;
   private String topic;
   private List<ProducerRecord<byte[], byte[]>> records;
   private final Set<Integer> checkpointRequestsHash = new HashSet<>();
-  private RowIngestionMetersFactory rowIngestionMetersFactory;
 
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
   {
@@ -356,7 +354,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     topic = getTopicName();
     records = generateRecords(topic);
     reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + 
System.currentTimeMillis(), "json");
-    appenderatorsManager = new TestAppenderatorsManager();
     makeToolboxFactory();
   }
 
@@ -1289,7 +1286,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
                     new StringDimensionSchema("kafka.topic"),
                     new LongDimensionSchema("kafka.offset"),
                     new StringDimensionSchema("kafka.header.encoding")
-                    )
+                )
             ),
             new AggregatorFactory[]{
                 new DoubleSumAggregatorFactory("met1sum", "met1"),
@@ -1324,13 +1321,12 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
         "\"2008/2012\"", QuerySegmentSpec.class
     );
-    List<ScanResultValue> scanResultValues = scanData(task, interval);
+    List<Map<String, Object>> scanResultValues = scanData(task, interval);
     //verify that there are no records indexed in the rollbacked time period
     Assert.assertEquals(3, Iterables.size(scanResultValues));
 
     int i = 0;
-    for (ScanResultValue result : scanResultValues) {
-      final Map<String, Object> event = ((List<Map<String, Object>>) 
result.getEvents()).get(0);
+    for (Map<String, Object> event : scanResultValues) {
       Assert.assertEquals((long) i++, event.get("kafka.offset"));
       Assert.assertEquals(topic, event.get("kafka.topic"));
       Assert.assertEquals("application/json", 
event.get("kafka.header.encoding"));
@@ -1401,13 +1397,11 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
         "\"2008/2012\"", QuerySegmentSpec.class
     );
-    List<ScanResultValue> scanResultValues = scanData(task, interval);
-    //verify that there are no records indexed in the rollbacked time period
+    List<Map<String, Object>> scanResultValues = scanData(task, interval);
     Assert.assertEquals(3, Iterables.size(scanResultValues));
 
     int i = 0;
-    for (ScanResultValue result : scanResultValues) {
-      final Map<String, Object> event = ((List<Map<String, Object>>) 
result.getEvents()).get(0);
+    for (Map<String, Object> event : scanResultValues) {
       Assert.assertEquals("application/json", 
event.get("kafka.testheader.encoding"));
       Assert.assertEquals("y", event.get("dim2"));
     }
@@ -2572,7 +2566,9 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 
0L), ImmutableSet.of()),
-            new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 
13L)),
+
+            // End offset is one after 12 real messages + 2 txn control 
messages (last seen message: offset 13).
+            new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 
14L)),
             kafkaServer.consumerProperties(),
             KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
             true,
@@ -2594,62 +2590,65 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
       kafkaProducer.commitTransaction();
     }
 
-    while (countEvents(task) != 2) {
-      Thread.sleep(25);
-    }
-
+    awaitConsumedOffsets(task, ImmutableMap.of(0, 1L)); // Consume two real 
messages
     Assert.assertEquals(2, countEvents(task));
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
 
     //verify the 2 indexed records
-    final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
-        "\"2008/2010\"", QuerySegmentSpec.class
-    );
-    Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
+    final QuerySegmentSpec firstInterval = 
OBJECT_MAPPER.readValue("\"2008/2010\"", QuerySegmentSpec.class);
+    Iterable<Map<String, Object>> scanResultValues = scanData(task, 
firstInterval);
     Assert.assertEquals(2, Iterables.size(scanResultValues));
 
     // Insert 3 more records and rollback
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
-      for (ProducerRecord<byte[], byte[]> record : 
Iterables.limit(Iterables.skip(records, 2), 3)) {
+      for (ProducerRecord<byte[], byte[]> record : 
Iterables.skip(Iterables.limit(records, 5), 2)) {
         kafkaProducer.send(record).get();
       }
       kafkaProducer.flush();
       kafkaProducer.abortTransaction();
     }
 
+    // Insert up through first 8 items
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : 
Iterables.skip(Iterables.limit(records, 8), 5)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    awaitConsumedOffsets(task, ImmutableMap.of(0, 9L)); // Consume 8 real 
messages + 2 txn controls
     Assert.assertEquals(2, countEvents(task));
-    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
 
-    final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
-        "\"2010/2012\"", QuerySegmentSpec.class
-    );
+    final QuerySegmentSpec rollbackedInterval = 
OBJECT_MAPPER.readValue("\"2010/2012\"", QuerySegmentSpec.class);
     scanResultValues = scanData(task, rollbackedInterval);
     //verify that there are no records indexed in the rollbacked time period
     Assert.assertEquals(0, Iterables.size(scanResultValues));
 
-    // Insert remaining data
+    final QuerySegmentSpec endInterval = 
OBJECT_MAPPER.readValue("\"2008/2049\"", QuerySegmentSpec.class);
+    Iterable<Map<String, Object>> scanResultValues1 = scanData(task, 
endInterval);
+    Assert.assertEquals(2, Iterables.size(scanResultValues1));
+
+    // Insert all remaining messages. One will get picked up.
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
-      for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) 
{
+      for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 8)) 
{
         kafkaProducer.send(record).get();
       }
       kafkaProducer.commitTransaction();
     }
 
-    final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
-        "\"2008/2049\"", QuerySegmentSpec.class
-    );
-    Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
-    Assert.assertEquals(2, Iterables.size(scanResultValues1));
-
+    // Wait for task to exit and publish
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
     Assert.assertEquals(task.getRunner().getEndOffsets(), 
task.getRunner().getCurrentOffsets());
 
     // Check metrics
     Assert.assertEquals(3, 
task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(1, 
task.getRunner().getRowIngestionMeters().getProcessedWithError());
     Assert.assertEquals(3, 
task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(1, 
task.getRunner().getRowIngestionMeters().getThrownAway());
 
@@ -2664,7 +2663,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         publishedDescriptors()
     );
     Assert.assertEquals(
-        new KafkaDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L))),
+        new KafkaDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 14L))),
         newDataSchemaMetadata()
     );
   }
@@ -2824,11 +2823,44 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(task, task1);
   }
 
-  private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec 
spec)
+  /**
+   * Wait for a task to consume certain offsets (inclusive).
+   */
+  private void awaitConsumedOffsets(final KafkaIndexTask task, final 
Map<Integer, Long> targetOffsets)
+      throws InterruptedException
   {
-    ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
-        NEW_DATA_SCHEMA.getDataSource()).intervals(spec).build();
-    return task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
+    while (true) {
+      final ConcurrentMap<Integer, Long> currentOffsets = 
task.getRunner().getCurrentOffsets();
+
+      // For Kafka, currentOffsets are the last read offsets plus one.
+      boolean allDone = true;
+      for (final Map.Entry<Integer, Long> entry : targetOffsets.entrySet()) {
+        final Long currentOffset = currentOffsets.get(entry.getKey());
+        if (currentOffset == null || currentOffset <= entry.getValue()) {
+          allDone = false;
+          break;
+        }
+      }
+
+      if (allDone) {
+        return;
+      } else {
+        Thread.sleep(5);
+      }
+    }
+  }
+
+  private List<Map<String, Object>> scanData(final Task task, QuerySegmentSpec 
spec)
+  {
+    ScanQuery query = new 
Druids.ScanQueryBuilder().dataSource(NEW_DATA_SCHEMA.getDataSource())
+                                                   .intervals(spec)
+                                                   
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+                                                   .build();
+
+    return task.getQueryRunner(query)
+               .run(QueryPlus.wrap(query))
+               .flatMap(result -> Sequences.simple((List<Map<String, Object>>) 
result.getEvents()))
+               .toList();
   }
 
   private void insertData() throws ExecutionException, InterruptedException
@@ -2836,7 +2868,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     insertData(records);
   }
 
-  private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records) 
throws ExecutionException, InterruptedException
+  private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records)
+      throws ExecutionException, InterruptedException
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
@@ -2943,28 +2976,28 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   {
     return new DefaultQueryRunnerFactoryConglomerate(
         ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
-            .put(
-                TimeseriesQuery.class,
-                new TimeseriesQueryRunnerFactory(
-                    new TimeseriesQueryQueryToolChest(),
-                    new TimeseriesQueryEngine(),
-                    (query, future) -> {
-                      // do nothing
-                    }
-                )
-            )
-            .put(
-                ScanQuery.class,
-                new ScanQueryRunnerFactory(
-                    new ScanQueryQueryToolChest(
-                        new ScanQueryConfig(),
-                        new DefaultGenericQueryMetricsFactory()
-                    ),
-                    new ScanQueryEngine(),
-                    new ScanQueryConfig()
-                )
-            )
-            .build()
+                    .put(
+                        TimeseriesQuery.class,
+                        new TimeseriesQueryRunnerFactory(
+                            new TimeseriesQueryQueryToolChest(),
+                            new TimeseriesQueryEngine(),
+                            (query, future) -> {
+                              // do nothing
+                            }
+                        )
+                    )
+                    .put(
+                        ScanQuery.class,
+                        new ScanQueryRunnerFactory(
+                            new ScanQueryQueryToolChest(
+                                new ScanQueryConfig(),
+                                new DefaultGenericQueryMetricsFactory()
+                            ),
+                            new ScanQueryEngine(),
+                            new ScanQueryConfig()
+                        )
+                    )
+                    .build()
     );
   }
 
@@ -2972,7 +3005,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   {
     directory = tempFolder.newFolder();
     final TestUtils testUtils = new TestUtils();
-    rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
+    RowIngestionMetersFactory rowIngestionMetersFactory = 
testUtils.getRowIngestionMetersFactory();
     final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
 
     for (Module module : new KafkaIndexTaskModule().getJacksonModules()) {
@@ -3135,13 +3168,16 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
       //multiple objects in one Kafka record will yield 2 rows in druid
       String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0", 
"2.0") +
-                     toJsonString(true, "2049", "d3", "y", "10", "23.0", 
"3.0");
+                          toJsonString(true, "2049", "d3", "y", "10", "23.0", 
"3.0");
 
       //multiple objects in one Kafka record but some objects are in 
ill-formed format
       //as a result, the whole ProducerRecord will be discarded
-      String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\", 
\"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
-                     "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", 
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
-                     "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", 
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+      String malformed =
+          "{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\": 
10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }"
+          +
+          "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 
10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }"
+          +
+          "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 
10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
 
       ProducerRecord<byte[], byte[]>[] producerRecords = new ProducerRecord[]{
           // pretty formatted
@@ -3149,7 +3185,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
           //well-formed
           new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
           //ill-formed
-          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(illformed)),
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(malformed)),
           //a well-formed record after ill-formed to demonstrate that the 
ill-formed can be successfully skipped
           new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d7", "y", 
"10", "20.0", "1.0"))
       };
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 6078121e98..d1bc388752 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -348,7 +348,7 @@ public class SeekableStreamIndexTaskTestBase extends 
EasyMockSupport
       List<SegmentDescriptor> actualDescriptors
   ) throws IOException
   {
-    Assert.assertEquals(expectedDescriptors.size(), actualDescriptors.size());
+    Assert.assertEquals("number of segments", expectedDescriptors.size(), 
actualDescriptors.size());
     final Comparator<SegmentDescriptor> comparator = (s1, s2) -> {
       final int intervalCompare = 
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), 
s2.getInterval());
       if (intervalCompare == 0) {
@@ -379,7 +379,9 @@ public class SeekableStreamIndexTaskTestBase extends 
EasyMockSupport
       if (expectedDesc.expectedDim1Values.isEmpty()) {
         continue; // Treating empty expectedDim1Values as a signal that 
checking the dim1 column value is not needed.
       }
-      Assertions.assertThat(readSegmentColumn("dim1", 
actualDesc)).isIn(expectedDesc.expectedDim1Values);
+      Assertions.assertThat(readSegmentColumn("dim1", actualDesc))
+                .describedAs("dim1 values")
+                .isIn(expectedDesc.expectedDim1Values);
     }
   }
 
@@ -447,7 +449,7 @@ public class SeekableStreamIndexTaskTestBase extends 
EasyMockSupport
                                           new LongSumAggregatorFactory("rows", 
"rows")
                                       )
                                   ).granularity(Granularities.ALL)
-                                  .intervals("0000/3000")
+                                  .intervals(Intervals.ONLY_ETERNITY)
                                   .build();
 
     List<Result<TimeseriesResultValue>> results = 
task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to