pvillard31 commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2843349156
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final
ProcessContext context) {
getLogger().info("No Kafka Consumer Service available; creating a new
one. Active count: {}", activeCount);
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
- return connectionService.getConsumerService(pollingContext);
+ final KafkaConsumerService newService =
connectionService.getConsumerService(pollingContext);
+ newService.setRebalanceCallback(createRebalanceCallback());
+ return newService;
+ }
+
+ private RebalanceCallback createRebalanceCallback() {
+ return revokedPartitions -> {
+ final ProcessSession session = currentSession.get();
+ final OffsetTracker offsetTracker = currentOffsetTracker.get();
+ if (session == null) {
+ getLogger().debug("No active session during rebalance
callback, nothing to commit");
+ return;
+ }
+
+ getLogger().info("Rebalance callback invoked for {} revoked
partitions, committing session synchronously",
+ revokedPartitions.size());
+
+ try {
+ session.commit();
Review Comment:
I'm not sure I agree with that comment. The synchronous commit is
intentional and necessary. The whole point of this fix is that:
- The Kafka consumer is only in a valid state to commit offsets during
`onPartitionsRevoked()`
- Once this callback returns, the consumer's group membership is revoked
- Any commit attempt after that fails with `RebalanceInProgressException`
Using `commitAsync()` would defeat the purpose because the async callback
would execute after `onPartitionsRevoked()` returns, no? At which point the
Kafka consumer is no longer valid and we'd get the same duplicate message
issue. IMO, the sequence must be:
1. `onPartitionsRevoked()` is called by Kafka
2. NiFi session commit (must complete before step 3)
3. Kafka offset commit (must happen while still in valid callback)
4. `onPartitionsRevoked()` returns
Or did I miss something with your comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]