turcsanyip commented on code in PR #9971:
URL: https://github.com/apache/nifi/pull/9971#discussion_r2106763589
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -353,73 +358,101 @@ public void onStopped() {
KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
- try {
- service.close();
- } catch (IOException e) {
- getLogger().warn("Failed to close Kafka Consumer Service", e);
- }
+ close(service);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
final KafkaConsumerService consumerService =
getConsumerService(context);
+ if (consumerService == null) {
+ getLogger().debug("No Kafka Consumer Service available; will yield
and return immediately");
+ context.yield();
+ return;
+ }
final long maxUncommittedMillis =
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final long stopTime = System.currentTimeMillis() +
maxUncommittedMillis;
final OffsetTracker offsetTracker = new OffsetTracker();
+ boolean recordsReceived = false;
- try {
- while (System.currentTimeMillis() < stopTime) {
- try {
- final Duration maxWaitDuration =
Duration.ofMillis(stopTime - System.currentTimeMillis());
- final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
- if (!consumerRecords.hasNext()) {
- getLogger().debug("No Kafka Records consumed: {}",
pollingContext);
- continue;
- }
-
- processConsumerRecords(context, session, offsetTracker,
consumerRecords);
- } catch (final Exception e) {
- getLogger().error("Failed to consume Kafka Records", e);
- consumerService.rollback();
-
- try {
- consumerService.close();
- } catch (final IOException ex) {
- getLogger().warn("Failed to close Kafka Consumer
Service", ex);
- }
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ final Duration maxWaitDuration = Duration.ofMillis(stopTime -
System.currentTimeMillis());
+ if (maxWaitDuration.toMillis() <= 0) {
break;
}
+
+ final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
+ if (!consumerRecords.hasNext()) {
+ getLogger().trace("No Kafka Records consumed: {}",
pollingContext);
+ continue;
+ }
+
+ recordsReceived = true;
+ processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+ } catch (final Exception e) {
+ getLogger().error("Failed to consume Kafka Records", e);
+ consumerService.rollback();
+ close(consumerService);
+ context.yield();
+ break;
Review Comment:
The following scenario is quite rare but can happen and lead to FlowFile
duplication and inconsistent state:
The polling `while` cycle runs multiple times within one `onTrigger()` and
the first run is successful but a subsequent one fails.
**Detailed steps:**
1st execution of the `while` cycle is successful =>
- `recordsReceived == true`
- FF created (uncommitted)
2nd execution fails =>
- the Kafka offset rolled back and the consumer closed
- after breaking the cycle, `Session.commit()` called
- the FF committed
- Kafka offset commit fails (as the consumer is closed)
- one more `close()` called
**Problems:**
- the Kafka offset was rolled back but the processor transferred and
committed the FF => the Kafka message will be processed again (duplicated) in
the next execution
- close() was called twice => double decrement, the counter goes below 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]