turcsanyip commented on code in PR #10538:
URL: https://github.com/apache/nifi/pull/10538#discussion_r2546451824


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##########
@@ -238,6 +240,14 @@ public KafkaConsumerService getConsumerService(final 
PollingContext pollingConte
         final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
         final Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties, deserializer, deserializer);
 
+        Integer partition = pollingContext.getPartition();
+        if (partition != null) {
+            List<TopicPartition> topicPartitions = 
pollingContext.getTopics().stream()
+                    .map(topic -> new TopicPartition(topic, partition))
+                    .collect(Collectors.toList());
+            consumer.assign(topicPartitions);
+        }
+

Review Comment:
   I suggest moving it to `Kafka3ConsumerService`'s constructor. That way, the 
assign/subscribe logic would be more straightforward (a single `if-else` 
instead of the separate `if` statements at different points in the code).
   The partition id can be passed in the `Subscription` parameter.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########


Review Comment:
   In case of static partition mapping, the consumers should be created eagerly 
for each partition in `availablePartitionedPollingContexts` (similar to 
[recreateAssignedConsumers()](https://github.com/apache/nifi/blob/support/nifi-1.x/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java#L502)
 in the old 2.6 implementation).
   
   Without that, a consumer is created for the first partition, and additional 
consumers are created for the other partitions only when parallel task 
execution triggers it. This is because a consumer is always available in the 
pool until multiple threads request consumers in parallel (which is not 
guaranteed to happen, e.g. Concurrent Tasks = 1).



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -334,6 +342,9 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
     private final Queue<KafkaConsumerService> consumerServices = new 
LinkedBlockingQueue<>();
     private final AtomicInteger activeConsumerCount = new AtomicInteger();
 
+    private final Queue<PollingContext> availablePartitionedPollingContexts = 
new LinkedBlockingQueue<>();
+    private final Map<KafkaConsumerService, PollingContext> 
consumerServiceToPartitionedPollingContext = new HashMap<>();

Review Comment:
   The map can be accessed by parallel threads so it should be 
`ConcurrentHashMap` or similar.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -379,6 +437,37 @@ public void onScheduled(final ProcessContext context) {
         if (maxUncommittedSizeConfigured) {
             maxUncommittedSize = 
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
         }
+
+        if 
(ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties()))
 {
+            final int numAssignedPartitions = 
ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+            final int partitionCount = getPartitionCount(connectionService);
+
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+
+                KafkaConsumerService service;
+                while ((service = consumerServices.poll()) != null) {
+                    close(service, "Not all partitions are assigned");
+                }

Review Comment:
   I think it can never happen because no consumers have been created so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to