kfaraz commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1281368803


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -162,31 +189,39 @@ public Long 
getEarliestSequenceNumber(StreamPartition<Integer> partition)
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions = new ArrayList<>();
+      for (String topic : stream.split(",")) {
+        if (!multiTopic && !allPartitions.isEmpty()) {
+          throw InvalidInput.exception("Comma separated list of topics [%s] is 
not supported unless you enabled "

Review Comment:
   Nit: this validation should be done before the loop using `!isMultiTopic and 
stream.contains(",")`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to