This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch remove_inc_publisher_context
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/remove_inc_publisher_context 
by this push:
     new 6b2dcfbe851 Remove IS_INCREMENTAL_HANDOFF_SUPPORTED context since 
incremental segment publishing has been the default.
6b2dcfbe851 is described below

commit 6b2dcfbe851f604d9131f0bc59ddcefe5f4df0de
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Tue Jun 10 22:09:03 2025 -0700

    Remove IS_INCREMENTAL_HANDOFF_SUPPORTED context since incremental segment 
publishing has been the default.
    
    This context was added for backwards compatibility for versions < 0.16.0. 
It's highly
    unlikely that folks will want to downgrade to something < 0.16.0. So this 
code cleans up the unnecessary
    context from the Kafka supervisor.
---
 .../druid/indexing/kafka/supervisor/KafkaSupervisor.java     |  4 ----
 .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 12 ++++++------
 2 files changed, 6 insertions(+), 10 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 743e4edc32c..82905b8eb92 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -231,10 +231,6 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     final String checkpoints = 
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
     final Map<String, Object> context = createBaseTaskContexts();
     context.put(CHECKPOINTS_CTX_KEY, checkpoints);
-    // Kafka index task always uses incremental handoff since 0.16.0.
-    // The below is for the compatibility when you want to downgrade your 
cluster to something earlier than 0.16.0.
-    // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the 
below configuration.
-    context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
 
     List<SeekableStreamIndexTask<KafkaTopicPartition, Long, 
KafkaRecordEntity>> taskList = new ArrayList<>();
     for (int i = 0; i < replicas; i++) {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9428de463bf..f617ab07597 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import 
org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
 import org.apache.druid.indexing.kafka.KafkaIndexTask;
@@ -63,6 +64,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
@@ -485,10 +487,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @Test
-  public void testCreateBaseTaskContexts() throws JsonProcessingException
+  public void testGetTaskRunnerType() throws JsonProcessingException
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
-    final Map<String, Object> contexts = supervisor.createIndexTasks(
+    SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity> 
indexTask = supervisor.createIndexTasks(
         1,
         "seq",
         OBJECT_MAPPER,
@@ -531,10 +533,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null
         ),
         null
-    ).get(0).getContext();
-    final Boolean contextValue = (Boolean) 
contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED");
-    Assert.assertNotNull(contextValue);
-    Assert.assertTrue(contextValue);
+    ).get(0);
+    Assert.assertTrue(indexTask.getRunner() instanceof 
IncrementalPublishingKafkaIndexTaskRunner);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to