This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch remove_inc_publisher_context in repository https://gitbox.apache.org/repos/asf/druid.git
commit 45472823d3f42f00109e57aa4a017afb21724a5a Author: Abhishek Balaji Radhakrishnan <[email protected]> AuthorDate: Tue Jun 10 22:09:03 2025 -0700 Remove IS_INCREMENTAL_HANDOFF_SUPPORTED context since incremental segment publishing has been the default. This context was added for backwards compatibility for versions < 0.16.0. It's highly unlikely that folks will want to downgrade to something < 0.16.0. So this code cleans up the unnecessary context from the Kafka supervisor. --- .../druid/indexing/kafka/supervisor/KafkaSupervisor.java | 4 ---- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 12 ++++++------ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 743e4edc32c..82905b8eb92 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -231,10 +231,6 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets); final Map<String, Object> context = createBaseTaskContexts(); context.put(CHECKPOINTS_CTX_KEY, checkpoints); - // Kafka index task always uses incremental handoff since 0.16.0. - // The below is for the compatibility when you want to downgrade your cluster to something earlier than 0.16.0. - // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration. - context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true); List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity>> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 9428de463bf..d0490328ff3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; @@ -63,6 +64,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; @@ -485,10 +487,10 @@ public class KafkaSupervisorTest extends EasyMockSupport } @Test - public void testCreateBaseTaskContexts() throws JsonProcessingException + public void testGetTaskRunnerType() throws JsonProcessingException { supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null); - final Map<String, Object> contexts = supervisor.createIndexTasks( + final SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity> indexTask = supervisor.createIndexTasks( 1, "seq", OBJECT_MAPPER, @@ -531,10 +533,8 @@ public class KafkaSupervisorTest extends EasyMockSupport null ), null - ).get(0).getContext(); - final Boolean contextValue = (Boolean) contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED"); - Assert.assertNotNull(contextValue); - Assert.assertTrue(contextValue); + ).get(0); + Assert.assertTrue(indexTask.getRunner() instanceof IncrementalPublishingKafkaIndexTaskRunner); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
