This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf56f2  Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward 
compatibility (#8050)
fcf56f2 is described below

commit fcf56f23300a32f14a0e34ed2c330c490f95b867
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jul 10 08:29:37 2019 -0700

    Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility (#8050)
    
    * Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility
    
    * do it for kafka only
    
    * fix test
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  4 ++
 .../kafka/supervisor/KafkaSupervisorTest.java      | 47 ++++++++++++++++++++++
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  1 -
 .../supervisor/SeekableStreamSupervisor.java       |  3 +-
 4 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cdf1336..c769617 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -240,6 +240,10 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long>
     final String checkpoints = 
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
     final Map<String, Object> context = createBaseTaskContexts();
     context.put(CHECKPOINTS_CTX_KEY, checkpoints);
+    // Kafka index task always uses incremental handoff since 0.16.0.
+    // The below is for the compatibility when you want to downgrade your 
cluster to something earlier than 0.16.0.
+    // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the 
below configuration.
+    context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
 
     List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index aff5639..05242da 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -256,6 +257,52 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @Test
+  public void testCreateBaseTaskContexts() throws JsonProcessingException
+  {
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    final Map<String, Object> contexts = supervisor.createIndexTasks(
+        1,
+        "seq",
+        objectMapper,
+        new TreeMap<>(),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "seq",
+            new SeekableStreamStartSequenceNumbers<>("test", 
Collections.emptyMap(), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>("test", 
Collections.emptyMap()),
+            Collections.emptyMap(),
+            null,
+            null,
+            null,
+            null
+        ),
+        new KafkaIndexTaskTuningConfig(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ),
+        null
+    ).get(0).getContext();
+    final Boolean contextValue = (Boolean) 
contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED");
+    Assert.assertNotNull(contextValue);
+    Assert.assertTrue(contextValue);
+  }
+
+  @Test
   public void testNoInitialState() throws Exception
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 9b6f893..e6bb39c 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -202,7 +202,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisor = null;
   }
 
-
   @Test
   public void testNoInitialState() throws Exception
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 64112d6..78f9e2d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2799,7 +2799,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
   }
 
-  protected Map<String, Object> createBaseTaskContexts()
+  @VisibleForTesting
+  public Map<String, Object> createBaseTaskContexts()
   {
     final Map<String, Object> contexts = new HashMap<>();
     if (spec.getContext() != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to