This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7023b4500cf List diff instead of entire set for Kafka Supervisor
partition mismatch (#18234)
7023b4500cf is described below
commit 7023b4500cf43f6be7d7747fce85ab9c7ceb40e5
Author: Virushade <[email protected]>
AuthorDate: Wed Jul 23 10:58:17 2025 +0800
List diff instead of entire set for Kafka Supervisor partition mismatch
(#18234)
* Clearer lag metrics
* Reduce verbosity of Kafka mismatch log
* Log KafkaTopicPartition instead of integer
* Use object mapper to serialize set difference
* Change format argument to include error message
* Correctly describe writeValueAsString as serialize
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index c501454bd35..68bdb3fb405 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -265,12 +267,18 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
return null;
}
- if
(!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
- log.warn(
- "Kafka partitions[%s] do not match task partitions[%s]",
- latestSequenceFromStream.keySet(),
- highestCurrentOffsets.keySet()
- );
+ Set<KafkaTopicPartition> kafkaPartitions =
latestSequenceFromStream.keySet();
+ Set<KafkaTopicPartition> taskPartitions = highestCurrentOffsets.keySet();
+ if (!kafkaPartitions.equals(taskPartitions)) {
+ try {
+ log.warn("Mismatched kafka and task partitions: Missing Task
Partitions %s, Missing Kafka Partitions %s",
+
sortingMapper.writeValueAsString(Sets.difference(kafkaPartitions,
taskPartitions)),
+
sortingMapper.writeValueAsString(Sets.difference(taskPartitions,
kafkaPartitions)));
+ }
+ catch (JsonProcessingException e) {
+ throw DruidException.defensive("Failed to serialize
KafkaTopicPartition when getting partition record lag: %s",
+ e.getMessage());
+ }
}
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]