zachjsh commented on code in PR #16190: URL: https://github.com/apache/druid/pull/16190#discussion_r1566575304
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() + { + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + if (dataSourceMetadata instanceof KafkaDataSourceMetadata + && checkSourceMetadataMatch(dataSourceMetadata)) { + @SuppressWarnings("unchecked") + SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata) + .getSeekableStreamSequenceNumbers(); + if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) { + Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>(); + Set<String> topicMisMatchLogged = new HashSet<>(); + boolean isMultiTopic = getIoConfig().isMultiTopic(); + Pattern pattern = isMultiTopic ? Pattern.compile(getIoConfig().getStream()) : null; + partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> { + final boolean match; + final String matchValue; + // previous offsets are from multi-topic config + if (kafkaTopicPartition.topic().isPresent()) { + matchValue = kafkaTopicPartition.topic().get(); + } else { + // previous offsets are from single topic config + matchValue = partitions.getStream(); + } + + match = pattern != null + ? pattern.matcher(matchValue).matches() + : getIoConfig().getStream().equals(matchValue); + + if (!match && !topicMisMatchLogged.contains(matchValue)) { Review Comment: simplified, let me know if ok now. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() + { + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + if (dataSourceMetadata instanceof KafkaDataSourceMetadata + && checkSourceMetadataMatch(dataSourceMetadata)) { + @SuppressWarnings("unchecked") + SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata) + .getSeekableStreamSequenceNumbers(); + if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) { + Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>(); + Set<String> topicMisMatchLogged = new HashSet<>(); + boolean isMultiTopic = getIoConfig().isMultiTopic(); + Pattern pattern = isMultiTopic ? Pattern.compile(getIoConfig().getStream()) : null; + partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> { + final boolean match; + final String matchValue; + // previous offsets are from multi-topic config + if (kafkaTopicPartition.topic().isPresent()) { + matchValue = kafkaTopicPartition.topic().get(); + } else { + // previous offsets are from single topic config + matchValue = partitions.getStream(); + } + + match = pattern != null + ? pattern.matcher(matchValue).matches() + : getIoConfig().getStream().equals(matchValue); + + if (!match && !topicMisMatchLogged.contains(matchValue)) { + log.warn( + "Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences", + matchValue, + getIoConfig().getStream() + ); + topicMisMatchLogged.add(matchValue); + } + if (match) { + if (isMultiTopic) { + partitionOffsets.put(new KafkaTopicPartition(true, matchValue, kafkaTopicPartition.partition()), value); + } else { + partitionOffsets.put(new KafkaTopicPartition(false, matchValue, kafkaTopicPartition.partition()), value); + } Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org