This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7023b4500cf List diff instead of entire set for Kafka Supervisor 
partition mismatch (#18234)
7023b4500cf is described below

commit 7023b4500cf43f6be7d7747fce85ab9c7ceb40e5
Author: Virushade <[email protected]>
AuthorDate: Wed Jul 23 10:58:17 2025 +0800

    List diff instead of entire set for Kafka Supervisor partition mismatch 
(#18234)
    
    * Clearer lag metrics
    
    * Reduce verbosity of Kafka mismatch log
    
    * Log KafkaTopicPartition instead of integer
    
    * Use object mapper to serialize set difference
    
    * Change format argument to include error message
    
    * Correctly describe writeValueAsString as serialize
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java   | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index c501454bd35..68bdb3fb405 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.kafka.KafkaRecordEntity;
 import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -265,12 +267,18 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       return null;
     }
 
-    if 
(!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
-      log.warn(
-          "Kafka partitions[%s] do not match task partitions[%s]",
-          latestSequenceFromStream.keySet(),
-          highestCurrentOffsets.keySet()
-      );
+    Set<KafkaTopicPartition> kafkaPartitions = 
latestSequenceFromStream.keySet();
+    Set<KafkaTopicPartition> taskPartitions = highestCurrentOffsets.keySet();
+    if (!kafkaPartitions.equals(taskPartitions)) {
+      try {
+        log.warn("Mismatched kafka and task partitions: Missing Task 
Partitions %s, Missing Kafka Partitions %s",
+                
sortingMapper.writeValueAsString(Sets.difference(kafkaPartitions, 
taskPartitions)),
+                 
sortingMapper.writeValueAsString(Sets.difference(taskPartitions, 
kafkaPartitions)));
+      }
+      catch (JsonProcessingException e) {
+        throw DruidException.defensive("Failed to serialize 
KafkaTopicPartition when getting partition record lag: %s",
+                                       e.getMessage());
+      }
     }
 
     return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to