kfaraz commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1293285391


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -165,31 +193,50 @@ public Long 
getEarliestSequenceNumber(StreamPartition<Integer> partition)
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions;
+      if (multiTopic) {
+        Pattern pattern = Pattern.compile(stream);
+        allPartitions = consumer.listTopics()
+                                .entrySet()
+                                .stream()
+                                .filter(e -> 
pattern.matcher(e.getKey()).matches())
+                                .flatMap(e -> e.getValue().stream())
+                                .collect(Collectors.toList());
+        if (allPartitions.isEmpty()) {
+          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                              
.ofCategory(DruidException.Category.INVALID_INPUT)
+                              .build("No partitions found for topics that 
match given pattern [%s]."
+                                     + "Check that the pattern regex is 
correct and matching topics exists", stream);
+        }
+      } else {
+        allPartitions = consumer.partitionsFor(stream);
+        if (allPartitions == null) {
+          throw DruidException.forPersona(DruidException.Persona.OPERATOR)

Review Comment:
   Maybe use `InvalidInput.exception` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to