abhishekagarwal87 commented on code in PR #16190: URL: https://github.com/apache/druid/pull/16190#discussion_r1545980952
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() + { + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + if (dataSourceMetadata instanceof KafkaDataSourceMetadata + && checkSourceMetadataMatch(dataSourceMetadata)) { + @SuppressWarnings("unchecked") + SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata) + .getSeekableStreamSequenceNumbers(); + if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) { + Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>(); + Set<String> topicMisMatchLogged = new HashSet<>(); + boolean isMultiTopic = getIoConfig().isMultiTopic(); + Pattern pattern = isMultiTopic ? Pattern.compile(getIoConfig().getStream()) : null; + partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> { + final boolean match; + final String matchValue; + // previous offsets are from multi-topic config + if (kafkaTopicPartition.topic().isPresent()) { + matchValue = kafkaTopicPartition.topic().get(); + } else { + // previous offsets are from single topic config + matchValue = partitions.getStream(); + } + + match = pattern != null + ? pattern.matcher(matchValue).matches() + : getIoConfig().getStream().equals(matchValue); + + if (!match && !topicMisMatchLogged.contains(matchValue)) { + log.warn( Review Comment: This could happen when going from multi-topic to single-topic? Will these bad offsets get cleared automatically? ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() Review Comment: or rename this method appropriately so that callers know that its also filtering the spurious stored offsets ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() Review Comment: Instead of overriding this method, you should just override the `checkSourceMetadataMatch` method. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() Review Comment: Doesn't this fail when `updateDataSourceMetadataWithHandle` is called later on since that too will match the committed metadata with the new metadata? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org