github-advanced-security[bot] commented on code in PR #19191:
URL: https://github.com/apache/druid/pull/19191#discussion_r2968214868
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
+ List<KafkaTopicPartition> partitions = new
ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than
partitions)
+ int numTasks = Math.min(backfillTaskCount, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d
partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1
: 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List<KafkaTopicPartition> taskPartitions =
partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
+ Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s],
setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getDataSchema(),
+ spec.getTuningConfig()
Review Comment:
## Deprecated method or constructor invocation
Invoking [KafkaSupervisorSpec.getTuningConfig](1) should be avoided because
it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10913)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
Review Comment:
## Deprecated method or constructor invocation
Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10910)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
Review Comment:
## Deprecated method or constructor invocation
Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10911)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
+ List<KafkaTopicPartition> partitions = new
ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than
partitions)
+ int numTasks = Math.min(backfillTaskCount, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d
partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1
: 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List<KafkaTopicPartition> taskPartitions =
partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
+ Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s],
setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getDataSchema(),
Review Comment:
## Deprecated method or constructor invocation
Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10912)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
+ List<KafkaTopicPartition> partitions = new
ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than
partitions)
+ int numTasks = Math.min(backfillTaskCount, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d
partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1
: 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List<KafkaTopicPartition> taskPartitions =
partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
+ Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s],
setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getDataSchema(),
+ spec.getTuningConfig()
+ );
+
+ KafkaSupervisorIOConfig kafkaIoConfig = spec.getIoConfig();
+ KafkaIndexTaskIOConfig backfillIoConfig = new KafkaIndexTaskIOConfig(
+ taskNum, // taskGroupId
+ baseSequenceName,
+ null,
+ null,
+ new
SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(),
taskStartOffsets, Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(),
taskEndOffsets),
+ kafkaIoConfig.getConsumerProperties(),
+ kafkaIoConfig.getPollTimeout(),
+ false, // useTransaction = false for backfill (no supervisor
coordination)
+ null, // minimumMessageTime - no time filtering for backfill
+ null, // maximumMessageTime - no time filtering for backfill
+ kafkaIoConfig.getInputFormat(),
+ kafkaIoConfig.getConfigOverrides(),
+ kafkaIoConfig.isMultiTopic(),
+ null, // refreshRejectionPeriodsInMinutes - don't refresh
rejection periods for backfill
+ false // supervised = false
+ );
+
+ // Create backfill task with different supervisorId
+ String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
+ Map<String, Object> context = createBaseTaskContexts();
+ // Use APPEND locks to allow writing to intervals that may overlap
with main supervisor
+ context.put("useConcurrentLocks", true);
+
+ KafkaIndexTask backfillTask = new KafkaIndexTask(
+ taskId,
+ backfillSupervisorId, // Use backfill supervisorId instead of
spec.getId()
+ new TaskResource(baseSequenceName, 1),
+ spec.getDataSchema(),
Review Comment:
## Deprecated method or constructor invocation
Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided
because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10915)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
+ List<KafkaTopicPartition> partitions = new
ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than
partitions)
+ int numTasks = Math.min(backfillTaskCount, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d
partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1
: 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List<KafkaTopicPartition> taskPartitions =
partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
+ Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s],
setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getDataSchema(),
+ spec.getTuningConfig()
+ );
+
+ KafkaSupervisorIOConfig kafkaIoConfig = spec.getIoConfig();
+ KafkaIndexTaskIOConfig backfillIoConfig = new KafkaIndexTaskIOConfig(
+ taskNum, // taskGroupId
+ baseSequenceName,
+ null,
+ null,
+ new
SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(),
taskStartOffsets, Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(),
taskEndOffsets),
+ kafkaIoConfig.getConsumerProperties(),
+ kafkaIoConfig.getPollTimeout(),
+ false, // useTransaction = false for backfill (no supervisor
coordination)
+ null, // minimumMessageTime - no time filtering for backfill
+ null, // maximumMessageTime - no time filtering for backfill
+ kafkaIoConfig.getInputFormat(),
+ kafkaIoConfig.getConfigOverrides(),
+ kafkaIoConfig.isMultiTopic(),
+ null, // refreshRejectionPeriodsInMinutes - don't refresh
rejection periods for backfill
+ false // supervised = false
+ );
+
+ // Create backfill task with different supervisorId
+ String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
+ Map<String, Object> context = createBaseTaskContexts();
+ // Use APPEND locks to allow writing to intervals that may overlap
with main supervisor
+ context.put("useConcurrentLocks", true);
+
+ KafkaIndexTask backfillTask = new KafkaIndexTask(
+ taskId,
+ backfillSupervisorId, // Use backfill supervisorId instead of
spec.getId()
+ new TaskResource(baseSequenceName, 1),
+ spec.getDataSchema(),
+ spec.getTuningConfig(),
Review Comment:
## Deprecated method or constructor invocation
Invoking [KafkaSupervisorSpec.getTuningConfig](1) should be avoided because
it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10916)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -265,6 +268,131 @@
return taskList;
}
+ @Override
+ public void submitBackfillTask(
+ Map<KafkaTopicPartition, Long> startOffsets,
+ Map<KafkaTopicPartition, Long> endOffsets
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null
|| endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getDataSchema().getDataSource() +
"_backfill";
+
+ // Get the backfillTaskCount from config
+ int backfillTaskCount = spec.getIoConfig().getBackfillTaskCount();
+ List<KafkaTopicPartition> partitions = new
ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than
partitions)
+ int numTasks = Math.min(backfillTaskCount, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d
partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1
: 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List<KafkaTopicPartition> taskPartitions =
partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
+ Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s],
setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getDataSchema(),
+ spec.getTuningConfig()
+ );
+
+ KafkaSupervisorIOConfig kafkaIoConfig = spec.getIoConfig();
Review Comment:
## Deprecated method or constructor invocation
Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/10914)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]