This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch remove_inc_publisher_context
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/remove_inc_publisher_context
by this push:
new 6b2dcfbe851 Remove IS_INCREMENTAL_HANDOFF_SUPPORTED context since
incremental segment publishing has been the default.
6b2dcfbe851 is described below
commit 6b2dcfbe851f604d9131f0bc59ddcefe5f4df0de
Author: Abhishek Balaji Radhakrishnan <[email protected]>
AuthorDate: Tue Jun 10 22:09:03 2025 -0700
Remove IS_INCREMENTAL_HANDOFF_SUPPORTED context since incremental segment
publishing has been the default.
This context was added for backwards compatibility for versions < 0.16.0.
It's highly
unlikely that folks will want to downgrade to something < 0.16.0. So this
code cleans up the unnecessary
context from the Kafka supervisor.
---
.../druid/indexing/kafka/supervisor/KafkaSupervisor.java | 4 ----
.../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 12 ++++++------
2 files changed, 6 insertions(+), 10 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 743e4edc32c..82905b8eb92 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -231,10 +231,6 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
final String checkpoints =
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
- // Kafka index task always uses incremental handoff since 0.16.0.
- // The below is for the compatibility when you want to downgrade your
cluster to something earlier than 0.16.0.
- // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the
below configuration.
- context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
List<SeekableStreamIndexTask<KafkaTopicPartition, Long,
KafkaRecordEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9428de463bf..f617ab07597 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import
org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
@@ -63,6 +64,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
@@ -485,10 +487,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Test
- public void testCreateBaseTaskContexts() throws JsonProcessingException
+ public void testGetTaskRunnerType() throws JsonProcessingException
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
- final Map<String, Object> contexts = supervisor.createIndexTasks(
+ SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEntity>
indexTask = supervisor.createIndexTasks(
1,
"seq",
OBJECT_MAPPER,
@@ -531,10 +533,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
),
null
- ).get(0).getContext();
- final Boolean contextValue = (Boolean)
contexts.get("IS_INCREMENTAL_HANDOFF_SUPPORTED");
- Assert.assertNotNull(contextValue);
- Assert.assertTrue(contextValue);
+ ).get(0);
+ Assert.assertTrue(indexTask.getRunner() instanceof
IncrementalPublishingKafkaIndexTaskRunner);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]