This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d5abd06b96 Fix flaky KafkaIndexTaskTest. (#12657)
d5abd06b96 is described below
commit d5abd06b9679d0223b8362855de5cb064ad76ef0
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Jun 24 13:53:51 2022 -0700
Fix flaky KafkaIndexTaskTest. (#12657)
* Fix flaky KafkaIndexTaskTest.
The testRunTransactionModeRollback case had many race conditions. Most
notably,
it would commit a transaction and then immediately check to see that the
results
were *not* indexed. This is racey because it relied on the indexing thread
being
slower than the test thread.
Now, the case waits for the transaction to be processed by the indexing
thread
before checking the results.
* Changes from review.
---
.../druid/indexing/kafka/KafkaIndexTask.java | 2 -
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 174 +++++++++++++--------
.../SeekableStreamIndexTaskTestBase.java | 8 +-
3 files changed, 110 insertions(+), 74 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 623379aae5..e24072e5a9 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -38,7 +38,6 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<Integer, Long, Kafka
{
private static final String TYPE = "index_kafka";
- private final KafkaIndexTaskIOConfig ioConfig;
private final ObjectMapper configMapper;
// This value can be tuned in some tests
@@ -65,7 +64,6 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<Integer, Long, Kafka
getFormattedGroupId(dataSchema.getDataSource(), TYPE)
);
this.configMapper = configMapper;
- this.ioConfig = ioConfig;
Preconditions.checkArgument(
ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 2067db25ef..afe9c68efe 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -96,6 +96,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -124,7 +125,6 @@ import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
-import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -141,7 +141,6 @@ import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
@@ -181,6 +180,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -252,11 +252,9 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
private Long maxTotalRows = null;
private Period intermediateHandoffPeriod = null;
- private AppenderatorsManager appenderatorsManager;
private String topic;
private List<ProducerRecord<byte[], byte[]>> records;
private final Set<Integer> checkpointRequestsHash = new HashSet<>();
- private RowIngestionMetersFactory rowIngestionMetersFactory;
private static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topic)
{
@@ -356,7 +354,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
topic = getTopicName();
records = generateRecords(topic);
reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" +
System.currentTimeMillis(), "json");
- appenderatorsManager = new TestAppenderatorsManager();
makeToolboxFactory();
}
@@ -1289,7 +1286,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
- )
+ )
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
@@ -1324,13 +1321,12 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
"\"2008/2012\"", QuerySegmentSpec.class
);
- List<ScanResultValue> scanResultValues = scanData(task, interval);
+ List<Map<String, Object>> scanResultValues = scanData(task, interval);
//verify that there are no records indexed in the rollbacked time period
Assert.assertEquals(3, Iterables.size(scanResultValues));
int i = 0;
- for (ScanResultValue result : scanResultValues) {
- final Map<String, Object> event = ((List<Map<String, Object>>)
result.getEvents()).get(0);
+ for (Map<String, Object> event : scanResultValues) {
Assert.assertEquals((long) i++, event.get("kafka.offset"));
Assert.assertEquals(topic, event.get("kafka.topic"));
Assert.assertEquals("application/json",
event.get("kafka.header.encoding"));
@@ -1401,13 +1397,11 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
"\"2008/2012\"", QuerySegmentSpec.class
);
- List<ScanResultValue> scanResultValues = scanData(task, interval);
- //verify that there are no records indexed in the rollbacked time period
+ List<Map<String, Object>> scanResultValues = scanData(task, interval);
Assert.assertEquals(3, Iterables.size(scanResultValues));
int i = 0;
- for (ScanResultValue result : scanResultValues) {
- final Map<String, Object> event = ((List<Map<String, Object>>)
result.getEvents()).get(0);
+ for (Map<String, Object> event : scanResultValues) {
Assert.assertEquals("application/json",
event.get("kafka.testheader.encoding"));
Assert.assertEquals("y", event.get("dim2"));
}
@@ -2572,7 +2566,9 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0,
0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0,
13L)),
+
+ // End offset is one after 12 real messages + 2 txn control
messages (last seen message: offset 13).
+ new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0,
14L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
@@ -2594,62 +2590,65 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
kafkaProducer.commitTransaction();
}
- while (countEvents(task) != 2) {
- Thread.sleep(25);
- }
-
+ awaitConsumedOffsets(task, ImmutableMap.of(0, 1L)); // Consume two real
messages
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
//verify the 2 indexed records
- final QuerySegmentSpec firstInterval = OBJECT_MAPPER.readValue(
- "\"2008/2010\"", QuerySegmentSpec.class
- );
- Iterable<ScanResultValue> scanResultValues = scanData(task, firstInterval);
+ final QuerySegmentSpec firstInterval =
OBJECT_MAPPER.readValue("\"2008/2010\"", QuerySegmentSpec.class);
+ Iterable<Map<String, Object>> scanResultValues = scanData(task,
firstInterval);
Assert.assertEquals(2, Iterables.size(scanResultValues));
// Insert 3 more records and rollback
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
- for (ProducerRecord<byte[], byte[]> record :
Iterables.limit(Iterables.skip(records, 2), 3)) {
+ for (ProducerRecord<byte[], byte[]> record :
Iterables.skip(Iterables.limit(records, 5), 2)) {
kafkaProducer.send(record).get();
}
kafkaProducer.flush();
kafkaProducer.abortTransaction();
}
+ // Insert up through first 8 items
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ for (ProducerRecord<byte[], byte[]> record :
Iterables.skip(Iterables.limit(records, 8), 5)) {
+ kafkaProducer.send(record).get();
+ }
+ kafkaProducer.commitTransaction();
+ }
+
+ awaitConsumedOffsets(task, ImmutableMap.of(0, 9L)); // Consume 8 real
messages + 2 txn controls
Assert.assertEquals(2, countEvents(task));
- Assert.assertEquals(Status.READING, task.getRunner().getStatus());
- final QuerySegmentSpec rollbackedInterval = OBJECT_MAPPER.readValue(
- "\"2010/2012\"", QuerySegmentSpec.class
- );
+ final QuerySegmentSpec rollbackedInterval =
OBJECT_MAPPER.readValue("\"2010/2012\"", QuerySegmentSpec.class);
scanResultValues = scanData(task, rollbackedInterval);
//verify that there are no records indexed in the rollbacked time period
Assert.assertEquals(0, Iterables.size(scanResultValues));
- // Insert remaining data
+ final QuerySegmentSpec endInterval =
OBJECT_MAPPER.readValue("\"2008/2049\"", QuerySegmentSpec.class);
+ Iterable<Map<String, Object>> scanResultValues1 = scanData(task,
endInterval);
+ Assert.assertEquals(2, Iterables.size(scanResultValues1));
+
+ // Insert all remaining messages. One will get picked up.
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
- for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5))
{
+ for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 8))
{
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
- final QuerySegmentSpec endInterval = OBJECT_MAPPER.readValue(
- "\"2008/2049\"", QuerySegmentSpec.class
- );
- Iterable<ScanResultValue> scanResultValues1 = scanData(task, endInterval);
- Assert.assertEquals(2, Iterables.size(scanResultValues1));
-
+ // Wait for task to exit and publish
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getRunner().getEndOffsets(),
task.getRunner().getCurrentOffsets());
// Check metrics
Assert.assertEquals(3,
task.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(1,
task.getRunner().getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3,
task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1,
task.getRunner().getRowIngestionMeters().getThrownAway());
@@ -2664,7 +2663,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
publishedDescriptors()
);
Assert.assertEquals(
- new KafkaDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 13L))),
+ new KafkaDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 14L))),
newDataSchemaMetadata()
);
}
@@ -2824,11 +2823,44 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(task, task1);
}
- private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec
spec)
+ /**
+ * Wait for a task to consume certain offsets (inclusive).
+ */
+ private void awaitConsumedOffsets(final KafkaIndexTask task, final
Map<Integer, Long> targetOffsets)
+ throws InterruptedException
{
- ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
- NEW_DATA_SCHEMA.getDataSource()).intervals(spec).build();
- return task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
+ while (true) {
+ final ConcurrentMap<Integer, Long> currentOffsets =
task.getRunner().getCurrentOffsets();
+
+ // For Kafka, currentOffsets are the last read offsets plus one.
+ boolean allDone = true;
+ for (final Map.Entry<Integer, Long> entry : targetOffsets.entrySet()) {
+ final Long currentOffset = currentOffsets.get(entry.getKey());
+ if (currentOffset == null || currentOffset <= entry.getValue()) {
+ allDone = false;
+ break;
+ }
+ }
+
+ if (allDone) {
+ return;
+ } else {
+ Thread.sleep(5);
+ }
+ }
+ }
+
+ private List<Map<String, Object>> scanData(final Task task, QuerySegmentSpec
spec)
+ {
+ ScanQuery query = new
Druids.ScanQueryBuilder().dataSource(NEW_DATA_SCHEMA.getDataSource())
+ .intervals(spec)
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
+ .build();
+
+ return task.getQueryRunner(query)
+ .run(QueryPlus.wrap(query))
+ .flatMap(result -> Sequences.simple((List<Map<String, Object>>)
result.getEvents()))
+ .toList();
}
private void insertData() throws ExecutionException, InterruptedException
@@ -2836,7 +2868,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
insertData(records);
}
- private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records)
throws ExecutionException, InterruptedException
+ private void insertData(Iterable<ProducerRecord<byte[], byte[]>> records)
+ throws ExecutionException, InterruptedException
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
@@ -2943,28 +2976,28 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
{
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
- .put(
- TimeseriesQuery.class,
- new TimeseriesQueryRunnerFactory(
- new TimeseriesQueryQueryToolChest(),
- new TimeseriesQueryEngine(),
- (query, future) -> {
- // do nothing
- }
- )
- )
- .put(
- ScanQuery.class,
- new ScanQueryRunnerFactory(
- new ScanQueryQueryToolChest(
- new ScanQueryConfig(),
- new DefaultGenericQueryMetricsFactory()
- ),
- new ScanQueryEngine(),
- new ScanQueryConfig()
- )
- )
- .build()
+ .put(
+ TimeseriesQuery.class,
+ new TimeseriesQueryRunnerFactory(
+ new TimeseriesQueryQueryToolChest(),
+ new TimeseriesQueryEngine(),
+ (query, future) -> {
+ // do nothing
+ }
+ )
+ )
+ .put(
+ ScanQuery.class,
+ new ScanQueryRunnerFactory(
+ new ScanQueryQueryToolChest(
+ new ScanQueryConfig(),
+ new DefaultGenericQueryMetricsFactory()
+ ),
+ new ScanQueryEngine(),
+ new ScanQueryConfig()
+ )
+ )
+ .build()
);
}
@@ -2972,7 +3005,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
{
directory = tempFolder.newFolder();
final TestUtils testUtils = new TestUtils();
- rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
+ RowIngestionMetersFactory rowIngestionMetersFactory =
testUtils.getRowIngestionMetersFactory();
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
for (Module module : new KafkaIndexTaskModule().getJacksonModules()) {
@@ -3135,13 +3168,16 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
//multiple objects in one Kafka record will yield 2 rows in druid
String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0",
"2.0") +
- toJsonString(true, "2049", "d3", "y", "10", "23.0",
"3.0");
+ toJsonString(true, "2049", "d3", "y", "10", "23.0",
"3.0");
//multiple objects in one Kafka record but some objects are in
ill-formed format
//as a result, the whole ProducerRecord will be discarded
- String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\",
\"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
- "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\",
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
- "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\",
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+ String malformed =
+ "{\"timestamp\":2049, \"dim1\": \"d4\", \"dim2\":\"x\", \"dimLong\":
10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }"
+ +
+ "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\":
10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }"
+ +
+ "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\":
10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
ProducerRecord<byte[], byte[]>[] producerRecords = new ProducerRecord[]{
// pretty formatted
@@ -3149,7 +3185,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
//well-formed
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
//ill-formed
- new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(illformed)),
+ new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(malformed)),
//a well-formed record after ill-formed to demonstrate that the
ill-formed can be successfully skipped
new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d7", "y",
"10", "20.0", "1.0"))
};
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 6078121e98..d1bc388752 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -348,7 +348,7 @@ public class SeekableStreamIndexTaskTestBase extends
EasyMockSupport
List<SegmentDescriptor> actualDescriptors
) throws IOException
{
- Assert.assertEquals(expectedDescriptors.size(), actualDescriptors.size());
+ Assert.assertEquals("number of segments", expectedDescriptors.size(),
actualDescriptors.size());
final Comparator<SegmentDescriptor> comparator = (s1, s2) -> {
final int intervalCompare =
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(),
s2.getInterval());
if (intervalCompare == 0) {
@@ -379,7 +379,9 @@ public class SeekableStreamIndexTaskTestBase extends
EasyMockSupport
if (expectedDesc.expectedDim1Values.isEmpty()) {
continue; // Treating empty expectedDim1Values as a signal that
checking the dim1 column value is not needed.
}
- Assertions.assertThat(readSegmentColumn("dim1",
actualDesc)).isIn(expectedDesc.expectedDim1Values);
+ Assertions.assertThat(readSegmentColumn("dim1", actualDesc))
+ .describedAs("dim1 values")
+ .isIn(expectedDesc.expectedDim1Values);
}
}
@@ -447,7 +449,7 @@ public class SeekableStreamIndexTaskTestBase extends
EasyMockSupport
new LongSumAggregatorFactory("rows",
"rows")
)
).granularity(Granularities.ALL)
- .intervals("0000/3000")
+ .intervals(Intervals.ONLY_ETERNITY)
.build();
List<Result<TimeseriesResultValue>> results =
task.getQueryRunner(query).run(QueryPlus.wrap(query)).toList();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]