github-code-scanning[bot] commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1293334345


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -126,19 +126,25 @@
 
 
   @Override
-  protected RecordSupplier<Integer, Long, KafkaRecordEntity> 
setupRecordSupplier()
+  protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> 
setupRecordSupplier()
   {
     return new KafkaRecordSupplier(
         spec.getIoConfig().getConsumerProperties(),
         sortingMapper,
-        spec.getIoConfig().getConfigOverrides()
+        spec.getIoConfig().getConfigOverrides(),
+        spec.getIoConfig().isMultiTopic()
     );
   }
 
   @Override
-  protected int getTaskGroupIdForPartition(Integer partitionId)
+  protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId)
   {
-    return partitionId % spec.getIoConfig().getTaskCount();
+    Integer taskCount = spec.getIoConfig().getTaskCount();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it 
has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5673)



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -165,31 +193,50 @@
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions;
+      if (multiTopic) {
+        Pattern pattern = Pattern.compile(stream);
+        allPartitions = consumer.listTopics()
+                                .entrySet()
+                                .stream()
+                                .filter(e -> 
pattern.matcher(e.getKey()).matches())
+                                .flatMap(e -> e.getValue().stream())
+                                .collect(Collectors.toList());
+        if (allPartitions.isEmpty()) {
+          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                              
.ofCategory(DruidException.Category.INVALID_INPUT)
+                              .build("No partitions found for topics that 
match given pattern [%s]."
+                                     + "Check that the pattern regex is 
correct and matching topics exists", stream);
+        }
+      } else {
+        allPartitions = consumer.partitionsFor(stream);
+        if (allPartitions == null) {
+          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                              
.ofCategory(DruidException.Category.INVALID_INPUT)
+                              .build("Topic [%s] is not found."
+                                     + "Check that the topic exists in Kafka 
cluster", stream);

Review Comment:
   ## Missing space in string literal
   
   This string appears to be missing a space after 'found.'.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5672)



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -165,31 +193,50 @@
   }
 
   @Override
-  public boolean isOffsetAvailable(StreamPartition<Integer> partition, 
OrderedSequenceNumber<Long> offset)
+  public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition> 
partition, OrderedSequenceNumber<Long> offset)
   {
     final Long earliestOffset = getEarliestSequenceNumber(partition);
     return earliestOffset != null
            && 
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
   }
 
   @Override
-  public Long getPosition(StreamPartition<Integer> partition)
+  public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
   {
-    return wrapExceptions(() -> consumer.position(new TopicPartition(
-        partition.getStream(),
-        partition.getPartitionId()
-    )));
+    return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
   @Override
-  public Set<Integer> getPartitionIds(String stream)
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      List<PartitionInfo> partitions = consumer.partitionsFor(stream);
-      if (partitions == null) {
-        throw new ISE("Topic [%s] is not found in KafkaConsumer's list of 
topics", stream);
+      List<PartitionInfo> allPartitions;
+      if (multiTopic) {
+        Pattern pattern = Pattern.compile(stream);

Review Comment:
   ## Regular expression injection
   
   This regular expression is constructed from a [user-provided value](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5671)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to