pvillard31 commented on code in PR #9807:
URL: https://github.com/apache/nifi/pull/9807#discussion_r1999452948
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -365,23 +363,53 @@ public void onStopped() {
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
final KafkaConsumerService consumerService =
getConsumerService(context);
+ final long maxUncommittedMillis =
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long stopTime = System.currentTimeMillis() +
maxUncommittedMillis;
+ final OffsetTracker offsetTracker = new OffsetTracker();
+
try {
- final Iterator<ByteRecord> consumerRecords =
consumerService.poll().iterator();
- if (!consumerRecords.hasNext()) {
- getLogger().debug("No Kafka Records consumed: {}",
pollingContext);
- return;
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ final Duration maxWaitDuration =
Duration.ofMillis(stopTime - System.currentTimeMillis());
+ final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
+ if (!consumerRecords.hasNext()) {
+ getLogger().debug("No Kafka Records consumed: {}",
pollingContext);
+ continue;
+ }
+
+ processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+ } catch (final Exception e) {
+ getLogger().error("Failed to consume Kafka Records", e);
+ consumerService.rollback();
+
+ try {
+ consumerService.close();
+ } catch (final IOException ex) {
+ getLogger().warn("Failed to close Kafka Consumer
Service", ex);
+ }
Review Comment:
I feel like explicitly calling the `close()` here is making things easier to
read compared to an empty try block. I can definitely make the change if there
is a consensus that the empty block is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]