This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new 3e47fa3 Fix testIncrementalHandOffReadsThroughEndOffsets in
Kafka/KinesisIndexTaskTest (#7264) (#7270)
3e47fa3 is described below
commit 3e47fa3146ffb5c7eb15ced63d67a4a57581cc23
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Mar 15 08:19:40 2019 -0700
Fix testIncrementalHandOffReadsThroughEndOffsets in
Kafka/KinesisIndexTaskTest (#7264) (#7270)
* Fix testIncrementalHandOffReadsThroughEndOffsets in
Kafka/KinesisIndexTaskTest
* revert unnecessary change
* fix test
* remove debug log
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 74 ++++--
.../indexing/kinesis/KinesisIndexTaskTest.java | 248 ++++++++++-----------
2 files changed, 176 insertions(+), 146 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index d2f7373..a52dd80 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -87,6 +87,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -903,11 +904,11 @@ public class KafkaIndexTaskTest
final SeekableStreamPartitions<Integer, Long> checkpoint1 =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
final SeekableStreamPartitions<Integer, Long> checkpoint2 =
- new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L));
final SeekableStreamPartitions<Integer, Long> endPartitions =
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0,
Long.MAX_VALUE));
- final KafkaIndexTask task = createTask(
+ final KafkaIndexTask normalReplica = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
@@ -922,34 +923,69 @@ public class KafkaIndexTaskTest
false
)
);
- final ListenableFuture<TaskStatus> future = runTask(task);
- while (task.getRunner().getStatus() != Status.PAUSED) {
+ final KafkaIndexTask staleReplica = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ false
+ )
+ );
+
+ final ListenableFuture<TaskStatus> normalReplicaFuture =
runTask(normalReplica);
+ // Simulating one replica is slower than the other
+ final ListenableFuture<TaskStatus> staleReplicaFuture =
ListenableFutures.transformAsync(
+ taskExec.submit(() -> {
+ Thread.sleep(1000);
+ return staleReplica;
+ }),
+ this::runTask
+ );
+
+ while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
- Map<Integer, Long> currentOffsets =
ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ staleReplica.getRunner().pause();
+ while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ Map<Integer, Long> currentOffsets =
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(),
currentOffsets);
- // Simulating the case when another replica has consumed up to the offset
of 8
- task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
+ normalReplica.getRunner().setEndOffsets(currentOffsets, false);
+ staleReplica.getRunner().setEndOffsets(currentOffsets, false);
- // The task is supposed to consume remaining rows up to the offset of 13
- while (task.getRunner().getStatus() != Status.PAUSED) {
+ while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
- currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ currentOffsets =
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(),
currentOffsets);
+ currentOffsets =
ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(),
currentOffsets);
- task.getRunner().setEndOffsets(
- ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
- true
- );
+ normalReplica.getRunner().setEndOffsets(currentOffsets, true);
+ staleReplica.getRunner().setEndOffsets(currentOffsets, true);
- Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+ Assert.assertEquals(TaskState.SUCCESS,
normalReplicaFuture.get().getStatusCode());
+ Assert.assertEquals(TaskState.SUCCESS,
staleReplicaFuture.get().getStatusCode());
- // processed count would be 8 if it stopped at it's current offsets
- Assert.assertEquals(13,
task.getRunner().getRowIngestionMeters().getProcessed());
- Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getUnparseable());
- Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getThrownAway());
+ Assert.assertEquals(9,
normalReplica.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
+
+ Assert.assertEquals(9,
staleReplica.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
staleReplica.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
staleReplica.getRunner().getRowIngestionMeters().getThrownAway());
}
@Test(timeout = 60_000L)
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index f60fe50..c4c3adb 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -92,6 +92,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -2558,6 +2559,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
"task1",
+ DATA_SCHEMA,
new KinesisIndexTaskIOConfig(
null,
"sequence0",
@@ -2617,125 +2619,147 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing
should happen
maxRowsPerSegment = 2;
- maxRecordsPerPoll = 1;
- recordSupplier.assign(anyObject());
+ final KinesisRecordSupplier recordSupplier1 =
mock(KinesisRecordSupplier.class);
+ recordSupplier1.assign(anyObject());
expectLastCall().anyTimes();
-
-
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
- recordSupplier.seek(anyObject(), anyString());
+
expect(recordSupplier1.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+ recordSupplier1.seek(anyObject(), anyString());
expectLastCall().anyTimes();
-
- expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
- .once()
- .andReturn(records.subList(4, 10))
- .once()
- .andReturn(records.subList(9, 15))
- .once();
-
- recordSupplier.close();
+ expect(recordSupplier1.poll(anyLong())).andReturn(records.subList(0, 5))
+ .once()
+ .andReturn(records.subList(4, 10))
+ .once();
+ recordSupplier1.close();
+ expectLastCall().once();
+ final KinesisRecordSupplier recordSupplier2 =
mock(KinesisRecordSupplier.class);
+ recordSupplier2.assign(anyObject());
+ expectLastCall().anyTimes();
+
expect(recordSupplier2.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+ recordSupplier2.seek(anyObject(), anyString());
+ expectLastCall().anyTimes();
+ expect(recordSupplier2.poll(anyLong())).andReturn(records.subList(0, 5))
+ .once()
+ .andReturn(records.subList(4, 10))
+ .once();
+ recordSupplier2.close();
expectLastCall().once();
replayAll();
final SeekableStreamPartitions<String, String> startPartitions = new
SeekableStreamPartitions<>(
stream,
- ImmutableMap.of(
- shardId1,
- "0"
- )
+ ImmutableMap.of(shardId1, "0")
);
final SeekableStreamPartitions<String, String> checkpoint1 = new
SeekableStreamPartitions<>(
stream,
- ImmutableMap.of(
- shardId1,
- "4"
- )
+ ImmutableMap.of(shardId1, "4")
);
final SeekableStreamPartitions<String, String> checkpoint2 = new
SeekableStreamPartitions<>(
stream,
- ImmutableMap.of(
- shardId1,
- "9"
- )
+ ImmutableMap.of(shardId1, "9")
);
final SeekableStreamPartitions<String, String> endPartitions = new
SeekableStreamPartitions<>(
stream,
- ImmutableMap.of(
- shardId1,
- "14"
- )
+ ImmutableMap.of(shardId1, "100") // simulating unlimited
);
- final KinesisIndexTask task = createTask(
+ final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
null,
- new KinesisIndexTaskIOConfig(
- null,
- baseSequenceName,
- startPartitions,
- endPartitions,
- true,
- null,
- null,
- "awsEndpoint",
- null,
- null,
- null,
- null,
- null,
- false
- )
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ true,
+ null,
+ null,
+ "awsEndpoint",
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
);
- final ListenableFuture<TaskStatus> future = runTask(task);
- while (task.getRunner().getStatus() !=
SeekableStreamIndexTaskRunner.Status.PAUSED) {
+ final KinesisIndexTask normalReplica = createTask(
+ null,
+ DATA_SCHEMA,
+ ioConfig,
+ null
+ );
+ ((TestableKinesisIndexTask)
normalReplica).setLocalSupplier(recordSupplier1);
+ final KinesisIndexTask staleReplica = createTask(
+ null,
+ DATA_SCHEMA,
+ ioConfig,
+ null
+ );
+ ((TestableKinesisIndexTask)
staleReplica).setLocalSupplier(recordSupplier2);
+ final ListenableFuture<TaskStatus> normalReplicaFuture =
runTask(normalReplica);
+ // Simulating one replica is slower than the other
+ final ListenableFuture<TaskStatus> staleReplicaFuture =
ListenableFutures.transformAsync(
+ taskExec.submit(() -> {
+ Thread.sleep(1000);
+ return staleReplica;
+ }),
+ this::runTask
+ );
+
+ while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ staleReplica.getRunner().pause();
+ while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
- Map<String, String> currentOffsets =
ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Map<String, String> currentOffsets =
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(),
currentOffsets);
- task.getRunner().setEndOffsets(currentOffsets, false);
- // The task is supposed to consume remaining rows up to the offset of 13
- while (task.getRunner().getStatus() != Status.PAUSED) {
+ normalReplica.getRunner().setEndOffsets(currentOffsets, false);
+ staleReplica.getRunner().setEndOffsets(currentOffsets, false);
+
+ while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
- currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ currentOffsets =
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(),
currentOffsets);
+ currentOffsets =
ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(),
currentOffsets);
- task.getRunner().setEndOffsets(
- ImmutableMap.of(shardId1,
String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1))
+ 1)),
- true
- );
+ normalReplica.getRunner().setEndOffsets(currentOffsets, true);
+ staleReplica.getRunner().setEndOffsets(currentOffsets, true);
- Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+ Assert.assertEquals(TaskState.SUCCESS,
normalReplicaFuture.get().getStatusCode());
+ Assert.assertEquals(TaskState.SUCCESS,
staleReplicaFuture.get().getStatusCode());
verifyAll();
Assert.assertEquals(2, checkpointRequestsHash.size());
// Check metrics
- Assert.assertEquals(12,
task.getRunner().getRowIngestionMeters().getProcessed());
- Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getUnparseable());
- Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getThrownAway());
+ Assert.assertEquals(10,
normalReplica.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0,
normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0,
normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
final Set<SegmentDescriptor> descriptors = new HashSet<>();
- descriptors.add(SD(task, "2008/P1D", 0));
- descriptors.add(SD(task, "2008/P1D", 1));
- descriptors.add(SD(task, "2009/P1D", 0));
- descriptors.add(SD(task, "2010/P1D", 0));
- descriptors.add(SD(task, "2010/P1D", 1));
- descriptors.add(SD(task, "2011/P1D", 0));
- descriptors.add(SD(task, "2011/P1D", 1));
- descriptors.add(SD(task, "2012/P1D", 0));
- descriptors.add(SD(task, "2013/P1D", 0));
+ descriptors.add(SD(normalReplica, "2008/P1D", 0));
+ descriptors.add(SD(normalReplica, "2009/P1D", 0));
+ descriptors.add(SD(normalReplica, "2010/P1D", 0));
+ descriptors.add(SD(normalReplica, "2010/P1D", 1));
+ descriptors.add(SD(normalReplica, "2011/P1D", 0));
+ descriptors.add(SD(normalReplica, "2011/P1D", 1));
+ descriptors.add(SD(normalReplica, "2012/P1D", 0));
+ descriptors.add(SD(normalReplica, "2013/P1D", 0));
Assert.assertEquals(descriptors, publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream,
ImmutableMap.of(
shardId1,
- "10"
+ "9"
))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
@@ -2792,26 +2816,27 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
final KinesisIndexTaskIOConfig ioConfig
)
{
- return createTask(taskId, DATA_SCHEMA, ioConfig);
+ return createTask(taskId, DATA_SCHEMA, ioConfig, null);
}
private KinesisIndexTask createTask(
final String taskId,
- final KinesisIndexTaskIOConfig ioConfig,
- final Map<String, Object> context
+ final DataSchema dataSchema,
+ final KinesisIndexTaskIOConfig ioConfig
)
{
- return createTask(taskId, DATA_SCHEMA, ioConfig, context);
+ return createTask(taskId, dataSchema, ioConfig, null);
}
private KinesisIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
- final KinesisIndexTaskIOConfig ioConfig
+ final KinesisIndexTaskIOConfig ioConfig,
+ @Nullable final Map<String, Object> context
)
{
final KinesisIndexTaskTuningConfig tuningConfig = new
KinesisIndexTaskTuningConfig(
- 1000,
+ maxRowsInMemory,
null,
maxRowsPerSegment,
maxTotalRows,
@@ -2823,11 +2848,11 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
- skipAvailabilityCheck,
+ true,
+ null,
null,
null,
null,
- 5000,
null,
null,
logParseExceptions,
@@ -2836,58 +2861,20 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
maxRecordsPerPoll,
intermediateHandoffPeriod
);
- final Map<String, Object> context = null;
- final KinesisIndexTask task = new TestableKinesisIndexTask(
- taskId,
- null,
- cloneDataSchema(dataSchema),
- tuningConfig,
- ioConfig,
- context,
- null,
- null,
- rowIngestionMetersFactory,
- null
- );
-
- return task;
+ return createTask(taskId, dataSchema, ioConfig, tuningConfig, context);
}
-
private KinesisIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig,
- final Map<String, Object> context
+ final KinesisIndexTaskTuningConfig tuningConfig,
+ @Nullable final Map<String, Object> context
)
{
- final KinesisIndexTaskTuningConfig tuningConfig = new
KinesisIndexTaskTuningConfig(
- maxRowsInMemory,
- null,
- maxRowsPerSegment,
- maxTotalRows,
- new Period("P1Y"),
- null,
- null,
- null,
- true,
- reportParseExceptions,
- handoffConditionTimeout,
- resetOffsetAutomatically,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions,
- maxRecordsPerPoll,
- intermediateHandoffPeriod
- );
- context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED,
true);
+ if (context != null) {
+ context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED,
true);
+ }
final KinesisIndexTask task = new TestableKinesisIndexTask(
taskId,
@@ -3245,8 +3232,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
@JsonTypeName("index_kinesis")
private static class TestableKinesisIndexTask extends KinesisIndexTask
{
+ private KinesisRecordSupplier localSupplier;
+
@JsonCreator
- public TestableKinesisIndexTask(
+ private TestableKinesisIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSchema") DataSchema dataSchema,
@@ -3273,10 +3262,15 @@ public class KinesisIndexTaskTest extends
EasyMockSupport
);
}
+ private void setLocalSupplier(KinesisRecordSupplier recordSupplier)
+ {
+ this.localSupplier = recordSupplier;
+ }
+
@Override
protected KinesisRecordSupplier newTaskRecordSupplier()
{
- return recordSupplier;
+ return localSupplier == null ? recordSupplier : localSupplier;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]