zachjsh commented on code in PR #16190:
URL: https://github.com/apache/druid/pull/16190#discussion_r1566575304


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata
+        && checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = 
((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != 
null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        boolean isMultiTopic = getIoConfig().isMultiTopic();
+        Pattern pattern = isMultiTopic ? 
Pattern.compile(getIoConfig().getStream()) : null;
+        
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) 
-> {
+          final boolean match;
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          match = pattern != null
+              ? pattern.matcher(matchValue).matches()
+              : getIoConfig().getStream().equals(matchValue);
+
+          if (!match && !topicMisMatchLogged.contains(matchValue)) {

Review Comment:
   simplified, let me know if ok now.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata
+        && checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = 
((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != 
null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        boolean isMultiTopic = getIoConfig().isMultiTopic();
+        Pattern pattern = isMultiTopic ? 
Pattern.compile(getIoConfig().getStream()) : null;
+        
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) 
-> {
+          final boolean match;
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          match = pattern != null
+              ? pattern.matcher(matchValue).matches()
+              : getIoConfig().getStream().equals(matchValue);
+
+          if (!match && !topicMisMatchLogged.contains(matchValue)) {
+            log.warn(
+                "Topic/stream in metadata storage [%s] doesn't match spec 
topic/stream [%s], ignoring stored sequences",
+                matchValue,
+                getIoConfig().getStream()
+            );
+            topicMisMatchLogged.add(matchValue);
+          }
+          if (match) {
+            if (isMultiTopic) {
+              partitionOffsets.put(new KafkaTopicPartition(true, matchValue, 
kafkaTopicPartition.partition()), value);
+            } else {
+              partitionOffsets.put(new KafkaTopicPartition(false, matchValue, 
kafkaTopicPartition.partition()), value);
+            }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to