pvillard31 commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2843349156


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final 
ProcessContext context) {
 
         getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+        final KafkaConsumerService newService = 
connectionService.getConsumerService(pollingContext);
+        newService.setRebalanceCallback(createRebalanceCallback());
+        return newService;
+    }
+
+    private RebalanceCallback createRebalanceCallback() {
+        return revokedPartitions -> {
+            final ProcessSession session = currentSession.get();
+            final OffsetTracker offsetTracker = currentOffsetTracker.get();
+            if (session == null) {
+                getLogger().debug("No active session during rebalance 
callback, nothing to commit");
+                return;
+            }
+
+            getLogger().info("Rebalance callback invoked for {} revoked 
partitions, committing session synchronously",
+                    revokedPartitions.size());
+
+            try {
+                session.commit();

Review Comment:
   I'm not sure I agree with that comment. The synchronous commit is 
intentional and necessary. The whole point of this fix is that:
   - The Kafka consumer is only in a valid state to commit offsets during 
`onPartitionsRevoked()`
   - Once this callback returns, the consumer's group membership is revoked
   - Any commit attempt after that fails with `RebalanceInProgressException`
   
   Using `commitAsync()` would defeat the purpose because the async callback 
would execute after `onPartitionsRevoked()` returns, no? At which point the 
Kafka consumer is no longer valid and we'd get the same duplicate message 
issue. IMO, the sequence must be:
   1. `onPartitionsRevoked()` is called by Kafka
   2. NiFi session commit (must complete before step 3)
   3. Kafka offset commit (must happen while still in valid callback)
   4. `onPartitionsRevoked()` returns
   
   Or did I miss something with your comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to