This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new 3e47fa3  Fix testIncrementalHandOffReadsThroughEndOffsets in 
Kafka/KinesisIndexTaskTest (#7264) (#7270)
3e47fa3 is described below

commit 3e47fa3146ffb5c7eb15ced63d67a4a57581cc23
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Mar 15 08:19:40 2019 -0700

    Fix testIncrementalHandOffReadsThroughEndOffsets in 
Kafka/KinesisIndexTaskTest (#7264) (#7270)
    
    * Fix testIncrementalHandOffReadsThroughEndOffsets in 
Kafka/KinesisIndexTaskTest
    
    * revert unnecessary change
    
    * fix test
    
    * remove debug log
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  74 ++++--
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 248 ++++++++++-----------
 2 files changed, 176 insertions(+), 146 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index d2f7373..a52dd80 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -87,6 +87,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -903,11 +904,11 @@ public class KafkaIndexTaskTest
     final SeekableStreamPartitions<Integer, Long> checkpoint1 =
         new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
     final SeekableStreamPartitions<Integer, Long> checkpoint2 =
-        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
+        new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L));
     final SeekableStreamPartitions<Integer, Long> endPartitions =
         new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 
Long.MAX_VALUE));
 
-    final KafkaIndexTask task = createTask(
+    final KafkaIndexTask normalReplica = createTask(
         null,
         new KafkaIndexTaskIOConfig(
             0,
@@ -922,34 +923,69 @@ public class KafkaIndexTaskTest
             false
         )
     );
-    final ListenableFuture<TaskStatus> future = runTask(task);
-    while (task.getRunner().getStatus() != Status.PAUSED) {
+    final KafkaIndexTask staleReplica = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+
+    final ListenableFuture<TaskStatus> normalReplicaFuture = 
runTask(normalReplica);
+    // Simulating one replica is slower than the other
+    final ListenableFuture<TaskStatus> staleReplicaFuture = 
ListenableFutures.transformAsync(
+        taskExec.submit(() -> {
+          Thread.sleep(1000);
+          return staleReplica;
+        }),
+        this::runTask
+    );
+
+    while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
       Thread.sleep(10);
     }
-    Map<Integer, Long> currentOffsets = 
ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    staleReplica.getRunner().pause();
+    while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    Map<Integer, Long> currentOffsets = 
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
     Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), 
currentOffsets);
 
-    // Simulating the case when another replica has consumed up to the offset 
of 8
-    task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
+    normalReplica.getRunner().setEndOffsets(currentOffsets, false);
+    staleReplica.getRunner().setEndOffsets(currentOffsets, false);
 
-    // The task is supposed to consume remaining rows up to the offset of 13
-    while (task.getRunner().getStatus() != Status.PAUSED) {
+    while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
       Thread.sleep(10);
     }
-    currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    currentOffsets = 
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), 
currentOffsets);
+    currentOffsets = 
ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
     Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), 
currentOffsets);
 
-    task.getRunner().setEndOffsets(
-        ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
-        true
-    );
+    normalReplica.getRunner().setEndOffsets(currentOffsets, true);
+    staleReplica.getRunner().setEndOffsets(currentOffsets, true);
 
-    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    Assert.assertEquals(TaskState.SUCCESS, 
normalReplicaFuture.get().getStatusCode());
+    Assert.assertEquals(TaskState.SUCCESS, 
staleReplicaFuture.get().getStatusCode());
 
-    // processed count would be 8 if it stopped at it's current offsets
-    Assert.assertEquals(13, 
task.getRunner().getRowIngestionMeters().getProcessed());
-    Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
-    Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(9, 
normalReplica.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
+
+    Assert.assertEquals(9, 
staleReplica.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
staleReplica.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
staleReplica.getRunner().getRowIngestionMeters().getThrownAway());
   }
 
   @Test(timeout = 60_000L)
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index f60fe50..c4c3adb 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -92,6 +92,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ListenableFutures;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -2558,6 +2559,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
     final KinesisIndexTask task = createTask(
         "task1",
+        DATA_SCHEMA,
         new KinesisIndexTaskIOConfig(
             null,
             "sequence0",
@@ -2617,125 +2619,147 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing 
should happen
     maxRowsPerSegment = 2;
-    maxRecordsPerPoll = 1;
 
-    recordSupplier.assign(anyObject());
+    final KinesisRecordSupplier recordSupplier1 = 
mock(KinesisRecordSupplier.class);
+    recordSupplier1.assign(anyObject());
     expectLastCall().anyTimes();
-
-    
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
-
-    recordSupplier.seek(anyObject(), anyString());
+    
expect(recordSupplier1.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+    recordSupplier1.seek(anyObject(), anyString());
     expectLastCall().anyTimes();
-
-    expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
-                                          .once()
-                                          .andReturn(records.subList(4, 10))
-                                          .once()
-                                          .andReturn(records.subList(9, 15))
-                                          .once();
-
-    recordSupplier.close();
+    expect(recordSupplier1.poll(anyLong())).andReturn(records.subList(0, 5))
+                                           .once()
+                                           .andReturn(records.subList(4, 10))
+                                           .once();
+    recordSupplier1.close();
+    expectLastCall().once();
+    final KinesisRecordSupplier recordSupplier2 = 
mock(KinesisRecordSupplier.class);
+    recordSupplier2.assign(anyObject());
+    expectLastCall().anyTimes();
+    
expect(recordSupplier2.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+    recordSupplier2.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+    expect(recordSupplier2.poll(anyLong())).andReturn(records.subList(0, 5))
+                                           .once()
+                                           .andReturn(records.subList(4, 10))
+                                           .once();
+    recordSupplier2.close();
     expectLastCall().once();
 
     replayAll();
 
     final SeekableStreamPartitions<String, String> startPartitions = new 
SeekableStreamPartitions<>(
         stream,
-        ImmutableMap.of(
-            shardId1,
-            "0"
-        )
+        ImmutableMap.of(shardId1, "0")
     );
 
     final SeekableStreamPartitions<String, String> checkpoint1 = new 
SeekableStreamPartitions<>(
         stream,
-        ImmutableMap.of(
-            shardId1,
-            "4"
-        )
+        ImmutableMap.of(shardId1, "4")
     );
 
     final SeekableStreamPartitions<String, String> checkpoint2 = new 
SeekableStreamPartitions<>(
         stream,
-        ImmutableMap.of(
-            shardId1,
-            "9"
-        )
+        ImmutableMap.of(shardId1, "9")
     );
 
     final SeekableStreamPartitions<String, String> endPartitions = new 
SeekableStreamPartitions<>(
         stream,
-        ImmutableMap.of(
-            shardId1,
-            "14"
-        )
+        ImmutableMap.of(shardId1, "100") // simulating unlimited
     );
-    final KinesisIndexTask task = createTask(
+    final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
         null,
-        new KinesisIndexTaskIOConfig(
-            null,
-            baseSequenceName,
-            startPartitions,
-            endPartitions,
-            true,
-            null,
-            null,
-            "awsEndpoint",
-            null,
-            null,
-            null,
-            null,
-            null,
-            false
-        )
+        baseSequenceName,
+        startPartitions,
+        endPartitions,
+        true,
+        null,
+        null,
+        "awsEndpoint",
+        null,
+        null,
+        null,
+        null,
+        null,
+        false
     );
-    final ListenableFuture<TaskStatus> future = runTask(task);
-    while (task.getRunner().getStatus() != 
SeekableStreamIndexTaskRunner.Status.PAUSED) {
+    final KinesisIndexTask normalReplica = createTask(
+        null,
+        DATA_SCHEMA,
+        ioConfig,
+        null
+    );
+    ((TestableKinesisIndexTask) 
normalReplica).setLocalSupplier(recordSupplier1);
+    final KinesisIndexTask staleReplica = createTask(
+        null,
+        DATA_SCHEMA,
+        ioConfig,
+        null
+    );
+    ((TestableKinesisIndexTask) 
staleReplica).setLocalSupplier(recordSupplier2);
+    final ListenableFuture<TaskStatus> normalReplicaFuture = 
runTask(normalReplica);
+    // Simulating one replica is slower than the other
+    final ListenableFuture<TaskStatus> staleReplicaFuture = 
ListenableFutures.transformAsync(
+        taskExec.submit(() -> {
+          Thread.sleep(1000);
+          return staleReplica;
+        }),
+        this::runTask
+    );
+
+    while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    staleReplica.getRunner().pause();
+    while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
       Thread.sleep(10);
     }
-    Map<String, String> currentOffsets = 
ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Map<String, String> currentOffsets = 
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
     Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), 
currentOffsets);
-    task.getRunner().setEndOffsets(currentOffsets, false);
 
-    // The task is supposed to consume remaining rows up to the offset of 13
-    while (task.getRunner().getStatus() != Status.PAUSED) {
+    normalReplica.getRunner().setEndOffsets(currentOffsets, false);
+    staleReplica.getRunner().setEndOffsets(currentOffsets, false);
+
+    while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    while (staleReplica.getRunner().getStatus() != Status.PAUSED) {
       Thread.sleep(10);
     }
-    currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    currentOffsets = 
ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), 
currentOffsets);
+    currentOffsets = 
ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets());
     Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), 
currentOffsets);
 
-    task.getRunner().setEndOffsets(
-        ImmutableMap.of(shardId1, 
String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) 
+ 1)),
-        true
-    );
+    normalReplica.getRunner().setEndOffsets(currentOffsets, true);
+    staleReplica.getRunner().setEndOffsets(currentOffsets, true);
 
-    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    Assert.assertEquals(TaskState.SUCCESS, 
normalReplicaFuture.get().getStatusCode());
+    Assert.assertEquals(TaskState.SUCCESS, 
staleReplicaFuture.get().getStatusCode());
 
     verifyAll();
 
     Assert.assertEquals(2, checkpointRequestsHash.size());
 
     // Check metrics
-    Assert.assertEquals(12, 
task.getRunner().getRowIngestionMeters().getProcessed());
-    Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
-    Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
+    Assert.assertEquals(10, 
normalReplica.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, 
normalReplica.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, 
normalReplica.getRunner().getRowIngestionMeters().getThrownAway());
 
     // Check published metadata
     final Set<SegmentDescriptor> descriptors = new HashSet<>();
-    descriptors.add(SD(task, "2008/P1D", 0));
-    descriptors.add(SD(task, "2008/P1D", 1));
-    descriptors.add(SD(task, "2009/P1D", 0));
-    descriptors.add(SD(task, "2010/P1D", 0));
-    descriptors.add(SD(task, "2010/P1D", 1));
-    descriptors.add(SD(task, "2011/P1D", 0));
-    descriptors.add(SD(task, "2011/P1D", 1));
-    descriptors.add(SD(task, "2012/P1D", 0));
-    descriptors.add(SD(task, "2013/P1D", 0));
+    descriptors.add(SD(normalReplica, "2008/P1D", 0));
+    descriptors.add(SD(normalReplica, "2009/P1D", 0));
+    descriptors.add(SD(normalReplica, "2010/P1D", 0));
+    descriptors.add(SD(normalReplica, "2010/P1D", 1));
+    descriptors.add(SD(normalReplica, "2011/P1D", 0));
+    descriptors.add(SD(normalReplica, "2011/P1D", 1));
+    descriptors.add(SD(normalReplica, "2012/P1D", 0));
+    descriptors.add(SD(normalReplica, "2013/P1D", 0));
     Assert.assertEquals(descriptors, publishedDescriptors());
     Assert.assertEquals(
         new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, 
ImmutableMap.of(
             shardId1,
-            "10"
+            "9"
         ))),
         
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
     );
@@ -2792,26 +2816,27 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
       final KinesisIndexTaskIOConfig ioConfig
   )
   {
-    return createTask(taskId, DATA_SCHEMA, ioConfig);
+    return createTask(taskId, DATA_SCHEMA, ioConfig, null);
   }
 
   private KinesisIndexTask createTask(
       final String taskId,
-      final KinesisIndexTaskIOConfig ioConfig,
-      final Map<String, Object> context
+      final DataSchema dataSchema,
+      final KinesisIndexTaskIOConfig ioConfig
   )
   {
-    return createTask(taskId, DATA_SCHEMA, ioConfig, context);
+    return createTask(taskId, dataSchema, ioConfig, null);
   }
 
   private KinesisIndexTask createTask(
       final String taskId,
       final DataSchema dataSchema,
-      final KinesisIndexTaskIOConfig ioConfig
+      final KinesisIndexTaskIOConfig ioConfig,
+      @Nullable final Map<String, Object> context
   )
   {
     final KinesisIndexTaskTuningConfig tuningConfig = new 
KinesisIndexTaskTuningConfig(
-        1000,
+        maxRowsInMemory,
         null,
         maxRowsPerSegment,
         maxTotalRows,
@@ -2823,11 +2848,11 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
         reportParseExceptions,
         handoffConditionTimeout,
         resetOffsetAutomatically,
-        skipAvailabilityCheck,
+        true,
+        null,
         null,
         null,
         null,
-        5000,
         null,
         null,
         logParseExceptions,
@@ -2836,58 +2861,20 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
         maxRecordsPerPoll,
         intermediateHandoffPeriod
     );
-    final Map<String, Object> context = null;
-    final KinesisIndexTask task = new TestableKinesisIndexTask(
-        taskId,
-        null,
-        cloneDataSchema(dataSchema),
-        tuningConfig,
-        ioConfig,
-        context,
-        null,
-        null,
-        rowIngestionMetersFactory,
-        null
-    );
-
-    return task;
+    return createTask(taskId, dataSchema, ioConfig, tuningConfig, context);
   }
 
-
   private KinesisIndexTask createTask(
       final String taskId,
       final DataSchema dataSchema,
       final KinesisIndexTaskIOConfig ioConfig,
-      final Map<String, Object> context
+      final KinesisIndexTaskTuningConfig tuningConfig,
+      @Nullable final Map<String, Object> context
   )
   {
-    final KinesisIndexTaskTuningConfig tuningConfig = new 
KinesisIndexTaskTuningConfig(
-        maxRowsInMemory,
-        null,
-        maxRowsPerSegment,
-        maxTotalRows,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        true,
-        reportParseExceptions,
-        handoffConditionTimeout,
-        resetOffsetAutomatically,
-        true,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        logParseExceptions,
-        maxParseExceptions,
-        maxSavedParseExceptions,
-        maxRecordsPerPoll,
-        intermediateHandoffPeriod
-    );
-    context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, 
true);
+    if (context != null) {
+      context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, 
true);
+    }
 
     final KinesisIndexTask task = new TestableKinesisIndexTask(
         taskId,
@@ -3245,8 +3232,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   @JsonTypeName("index_kinesis")
   private static class TestableKinesisIndexTask extends KinesisIndexTask
   {
+    private KinesisRecordSupplier localSupplier;
+
     @JsonCreator
-    public TestableKinesisIndexTask(
+    private TestableKinesisIndexTask(
         @JsonProperty("id") String id,
         @JsonProperty("resource") TaskResource taskResource,
         @JsonProperty("dataSchema") DataSchema dataSchema,
@@ -3273,10 +3262,15 @@ public class KinesisIndexTaskTest extends 
EasyMockSupport
       );
     }
 
+    private void setLocalSupplier(KinesisRecordSupplier recordSupplier)
+    {
+      this.localSupplier = recordSupplier;
+    }
+
     @Override
     protected KinesisRecordSupplier newTaskRecordSupplier()
     {
-      return recordSupplier;
+      return localSupplier == null ? recordSupplier : localSupplier;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to