github-code-scanning[bot] commented on code in PR #14424:
URL: https://github.com/apache/druid/pull/14424#discussion_r1293334345
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -126,19 +126,25 @@
@Override
- protected RecordSupplier<Integer, Long, KafkaRecordEntity>
setupRecordSupplier()
+ protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity>
setupRecordSupplier()
{
return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
- spec.getIoConfig().getConfigOverrides()
+ spec.getIoConfig().getConfigOverrides(),
+ spec.getIoConfig().isMultiTopic()
);
}
@Override
- protected int getTaskGroupIdForPartition(Integer partitionId)
+ protected int getTaskGroupIdForPartition(KafkaTopicPartition partitionId)
{
- return partitionId % spec.getIoConfig().getTaskCount();
+ Integer taskCount = spec.getIoConfig().getTaskCount();
Review Comment:
## Deprecated method or constructor invocation
Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5673)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -165,31 +193,50 @@
}
@Override
- public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition>
partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
- public Long getPosition(StreamPartition<Integer> partition)
+ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
{
- return wrapExceptions(() -> consumer.position(new TopicPartition(
- partition.getStream(),
- partition.getPartitionId()
- )));
+ return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
@Override
- public Set<Integer> getPartitionIds(String stream)
+ public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- List<PartitionInfo> partitions = consumer.partitionsFor(stream);
- if (partitions == null) {
- throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", stream);
+ List<PartitionInfo> allPartitions;
+ if (multiTopic) {
+ Pattern pattern = Pattern.compile(stream);
+ allPartitions = consumer.listTopics()
+ .entrySet()
+ .stream()
+ .filter(e ->
pattern.matcher(e.getKey()).matches())
+ .flatMap(e -> e.getValue().stream())
+ .collect(Collectors.toList());
+ if (allPartitions.isEmpty()) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+
.ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("No partitions found for topics that
match given pattern [%s]."
+ + "Check that the pattern regex is
correct and matching topics exists", stream);
+ }
+ } else {
+ allPartitions = consumer.partitionsFor(stream);
+ if (allPartitions == null) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+
.ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Topic [%s] is not found."
+ + "Check that the topic exists in Kafka
cluster", stream);
Review Comment:
## Missing space in string literal
This string appears to be missing a space after 'found.'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/5672)
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -165,31 +193,50 @@
}
@Override
- public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ public boolean isOffsetAvailable(StreamPartition<KafkaTopicPartition>
partition, OrderedSequenceNumber<Long> offset)
{
final Long earliestOffset = getEarliestSequenceNumber(partition);
return earliestOffset != null
&&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
}
@Override
- public Long getPosition(StreamPartition<Integer> partition)
+ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
{
- return wrapExceptions(() -> consumer.position(new TopicPartition(
- partition.getStream(),
- partition.getPartitionId()
- )));
+ return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
@Override
- public Set<Integer> getPartitionIds(String stream)
+ public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- List<PartitionInfo> partitions = consumer.partitionsFor(stream);
- if (partitions == null) {
- throw new ISE("Topic [%s] is not found in KafkaConsumer's list of
topics", stream);
+ List<PartitionInfo> allPartitions;
+ if (multiTopic) {
+ Pattern pattern = Pattern.compile(stream);
Review Comment:
## Regular expression injection
This regular expression is constructed from a [user-provided value](1).
[Show more
details](https://github.com/apache/druid/security/code-scanning/5671)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]