This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new fcf56f2 Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward
compatibility (#8050)
fcf56f2 is described below
commit fcf56f23300a32f14a0e34ed2c330c490f95b867
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jul 10 08:29:37 2019 -0700
Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility (#8050)
* Add IS_INCREMENTAL_HANDOFF_SUPPORTED for KIS backward compatibility
* do it for kafka only
* fix test
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 4 ++
.../kafka/supervisor/KafkaSupervisorTest.java | 47 ++++++++++++++++++++++
.../kinesis/supervisor/KinesisSupervisorTest.java | 1 -
.../supervisor/SeekableStreamSupervisor.java | 3 +-
4 files changed, 53 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index cdf1336..c769617 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -240,6 +240,10 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<Integer, Long>
final String checkpoints =
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
+ // Kafka index task always uses incremental handoff since 0.16.0.
+ // The below is for the compatibility when you want to downgrade your
cluster to something earlier than 0.16.0.
+ // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the
below configuration.
+ context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index aff5639..05242da 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.kafka.supervisor;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -256,6 +257,52 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Test
+ public void testCreateBaseTaskContexts() throws JsonProcessingException
+ {
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ final Map<String, Object> contexts = supervisor.createIndexTasks(
+ 1,
+ "seq",
+ objectMapper,
+ new TreeMap<>(),
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "seq",
+ new SeekableStreamStartSequenceNumbers<>("test",
Collections.emptyMap(), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>("test",
Collections.emptyMap()),
+ Collections.emptyMap(),
+ null,
+ null,
+ null,
+ null
+ ),
+ new KafkaIndexTaskTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null
+ ).get(0).getContext();
+ final Boolean contextValue = (Boolean)
contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED");
+ Assert.assertNotNull(contextValue);
+ Assert.assertTrue(contextValue);
+ }
+
+ @Test
public void testNoInitialState() throws Exception
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 9b6f893..e6bb39c 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -202,7 +202,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor = null;
}
-
@Test
public void testNoInitialState() throws Exception
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 64112d6..78f9e2d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2799,7 +2799,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
- protected Map<String, Object> createBaseTaskContexts()
+ @VisibleForTesting
+ public Map<String, Object> createBaseTaskContexts()
{
final Map<String, Object> contexts = new HashMap<>();
if (spec.getContext() != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]