abhishekagarwal87 commented on code in PR #16190:
URL: https://github.com/apache/druid/pull/16190#discussion_r1545980952


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata
+        && checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = 
((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != 
null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        boolean isMultiTopic = getIoConfig().isMultiTopic();
+        Pattern pattern = isMultiTopic ? 
Pattern.compile(getIoConfig().getStream()) : null;
+        
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) 
-> {
+          final boolean match;
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          match = pattern != null
+              ? pattern.matcher(matchValue).matches()
+              : getIoConfig().getStream().equals(matchValue);
+
+          if (!match && !topicMisMatchLogged.contains(matchValue)) {
+            log.warn(

Review Comment:
   This could happen when going from multi-topic to single-topic? Will these 
bad offsets get cleared automatically? 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()

Review Comment:
   or rename this method appropriately so that callers know that its also 
filtering the spurious stored offsets



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()

Review Comment:
   Instead of overriding this method, you should just override the 
`checkSourceMetadataMatch` method. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()

Review Comment:
   Doesn't this fail when `updateDataSourceMetadataWithHandle` is called later 
on since that too will match the committed metadata with the new metadata? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to