pvillard31 commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2843357079


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final 
ProcessContext context) {
 
         getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+        final KafkaConsumerService newService = 
connectionService.getConsumerService(pollingContext);
+        newService.setRebalanceCallback(createRebalanceCallback());
+        return newService;
+    }
+
+    private RebalanceCallback createRebalanceCallback() {
+        return revokedPartitions -> {
+            final ProcessSession session = currentSession.get();
+            final OffsetTracker offsetTracker = currentOffsetTracker.get();

Review Comment:
   I don't think we can just pass the session directly because the consumer 
service is created once and pooled, and the session is different for each 
`onTrigger()` call. I'll change for a holder approach that goes with your 
previous comment for `ThreadLocal`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to