turcsanyip commented on code in PR #10538:
URL: https://github.com/apache/nifi/pull/10538#discussion_r2546451824
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##########
@@ -238,6 +240,14 @@ public KafkaConsumerService getConsumerService(final
PollingContext pollingConte
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer = new
KafkaConsumer<>(properties, deserializer, deserializer);
+ Integer partition = pollingContext.getPartition();
+ if (partition != null) {
+ List<TopicPartition> topicPartitions =
pollingContext.getTopics().stream()
+ .map(topic -> new TopicPartition(topic, partition))
+ .collect(Collectors.toList());
+ consumer.assign(topicPartitions);
+ }
+
Review Comment:
I suggest moving it to `Kafka3ConsumerService`'s constructor. That way, the
assign/subscribe logic would be more straightforward (a single `if-else`
instead of the separate `if` statements at different points in the code).
The partition id can be passed in the `Subscription` parameter.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
Review Comment:
In case of static partition mapping, the consumers should be created eagerly
for each partition in `availablePartitionedPollingContexts` (similar to
[recreateAssignedConsumers()](https://github.com/apache/nifi/blob/support/nifi-1.x/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java#L502)
in the old 2.6 implementation).
Without that, a consumer is created for the first partition, and additional
consumers are created for the other partitions only when parallel task
execution triggers it. This is because a consumer is always available in the
pool until multiple threads request consumers in parallel (which is not
guaranteed to happen, e.g. Concurrent Tasks = 1).
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -334,6 +342,9 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private final Queue<KafkaConsumerService> consumerServices = new
LinkedBlockingQueue<>();
private final AtomicInteger activeConsumerCount = new AtomicInteger();
+ private final Queue<PollingContext> availablePartitionedPollingContexts =
new LinkedBlockingQueue<>();
+ private final Map<KafkaConsumerService, PollingContext>
consumerServiceToPartitionedPollingContext = new HashMap<>();
Review Comment:
The map can be accessed by parallel threads so it should be
`ConcurrentHashMap` or similar.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -379,6 +437,37 @@ public void onScheduled(final ProcessContext context) {
if (maxUncommittedSizeConfigured) {
maxUncommittedSize =
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
}
+
+ if
(ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()))
{
+ final int numAssignedPartitions =
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+ final int partitionCount = getPartitionCount(connectionService);
+
+ if (partitionCount != numAssignedPartitions) {
+ context.yield();
+
+ KafkaConsumerService service;
+ while ((service = consumerServices.poll()) != null) {
+ close(service, "Not all partitions are assigned");
+ }
Review Comment:
I think it can never happen because no consumers have been created so far.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]