markap14 commented on code in PR #9971:
URL: https://github.com/apache/nifi/pull/9971#discussion_r2111904132
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -353,73 +358,101 @@ public void onStopped() {
KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
- try {
- service.close();
- } catch (IOException e) {
- getLogger().warn("Failed to close Kafka Consumer Service", e);
- }
+ close(service);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
final KafkaConsumerService consumerService =
getConsumerService(context);
+ if (consumerService == null) {
+ getLogger().debug("No Kafka Consumer Service available; will yield
and return immediately");
+ context.yield();
+ return;
+ }
final long maxUncommittedMillis =
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final long stopTime = System.currentTimeMillis() +
maxUncommittedMillis;
final OffsetTracker offsetTracker = new OffsetTracker();
+ boolean recordsReceived = false;
- try {
- while (System.currentTimeMillis() < stopTime) {
- try {
- final Duration maxWaitDuration =
Duration.ofMillis(stopTime - System.currentTimeMillis());
- final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
- if (!consumerRecords.hasNext()) {
- getLogger().debug("No Kafka Records consumed: {}",
pollingContext);
- continue;
- }
-
- processConsumerRecords(context, session, offsetTracker,
consumerRecords);
- } catch (final Exception e) {
- getLogger().error("Failed to consume Kafka Records", e);
- consumerService.rollback();
-
- try {
- consumerService.close();
- } catch (final IOException ex) {
- getLogger().warn("Failed to close Kafka Consumer
Service", ex);
- }
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ final Duration maxWaitDuration = Duration.ofMillis(stopTime -
System.currentTimeMillis());
+ if (maxWaitDuration.toMillis() <= 0) {
break;
}
+
+ final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
+ if (!consumerRecords.hasNext()) {
+ getLogger().trace("No Kafka Records consumed: {}",
pollingContext);
+ continue;
+ }
+
+ recordsReceived = true;
+ processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+ } catch (final Exception e) {
+ getLogger().error("Failed to consume Kafka Records", e);
+ consumerService.rollback();
+ close(consumerService);
+ context.yield();
+ break;
Review Comment:
Thanks for reviewing @turcsanyip and great catch. I pushed an update. In the
even that we catch an Exception there, we will now rollback the ProcessSession
so that any previously created/transferred FlowFiles now get rolled back. I
also updated the close(KafkaConsumerService) to check
KafkaConsumerService.isClosed() before decrementing the count. This should
address each of the two concerns.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]