pvillard31 commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2843357079
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final
ProcessContext context) {
getLogger().info("No Kafka Consumer Service available; creating a new
one. Active count: {}", activeCount);
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
- return connectionService.getConsumerService(pollingContext);
+ final KafkaConsumerService newService =
connectionService.getConsumerService(pollingContext);
+ newService.setRebalanceCallback(createRebalanceCallback());
+ return newService;
+ }
+
+ private RebalanceCallback createRebalanceCallback() {
+ return revokedPartitions -> {
+ final ProcessSession session = currentSession.get();
+ final OffsetTracker offsetTracker = currentOffsetTracker.get();
Review Comment:
I don't think we can just pass the session directly because the consumer
service is created once and pooled, and the session is different for each
`onTrigger()` call. I'll change for a holder approach that goes with your
previous comment for `ThreadLocal`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]