abhishekrb19 commented on code in PR #16190:
URL: https://github.com/apache/druid/pull/16190#discussion_r1562793256


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata
+        && checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = 
((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != 
null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        boolean isMultiTopic = getIoConfig().isMultiTopic();
+        Pattern pattern = isMultiTopic ? 
Pattern.compile(getIoConfig().getStream()) : null;
+        
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) 
-> {
+          final boolean match;
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          match = pattern != null
+              ? pattern.matcher(matchValue).matches()
+              : getIoConfig().getStream().equals(matchValue);
+
+          if (!match && !topicMisMatchLogged.contains(matchValue)) {
+            log.warn(

Review Comment:
   Yeah, having the user reset the offsets explicitly in this scenario when 
there's no match at all makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to