This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 387ae7d323 NIFI-10199: PublishKafka InFlightMessageTracker should not 
log batch level errors on each record
387ae7d323 is described below

commit 387ae7d32318c1cd765c1794aff70f4b80a40b28
Author: Daniel Urban <[email protected]>
AuthorDate: Fri Jul 8 10:39:56 2022 +0200

    NIFI-10199: PublishKafka InFlightMessageTracker should not log batch level 
errors on each record
    
    Currently, when a batch level error occurs (e.g. delivery timeout), the 
same error is logged for each record of the batch, needlessly flooding the logs.
    InFlightMessageTracker now only logs an exception when a flow file 
encounters it for the first time.
    
    This closes #6185
    Signed-off-by: Paul Grey <[email protected]>
---
 .../nifi/processors/kafka/pubsub/InFlightMessageTracker.java   | 10 +++++++++-
 .../nifi/processors/kafka/pubsub/InFlightMessageTracker.java   | 10 +++++++++-
 .../nifi/processors/kafka/pubsub/InFlightMessageTracker.java   | 10 +++++++++-
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 3ec3d1c53f..92dcf86249 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
 public class InFlightMessageTracker {
     private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<FlowFile, Exception> failures = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures 
= new ConcurrentHashMap<>();
     private final Object progressMutex = new Object();
     private final ComponentLog logger;
 
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
 
     public void fail(final FlowFile flowFile, final Exception exception) {
         failures.putIfAbsent(flowFile, exception);
-        logger.error("Failed to send " + flowFile + " to Kafka", exception);
+        boolean newException = encounteredFailures
+            .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+            .add(exception);
+        if (newException) {
+            logger.error("Failed to send {} to Kafka", flowFile, exception);
+        }
 
         synchronized (progressMutex) {
             progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
     public void reset() {
         messageCountsByFlowFile.clear();
         failures.clear();
+        encounteredFailures.clear();
     }
 
     public PublishResult failOutstanding(final Exception exception) {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 3ec3d1c53f..92dcf86249 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
 public class InFlightMessageTracker {
     private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<FlowFile, Exception> failures = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures 
= new ConcurrentHashMap<>();
     private final Object progressMutex = new Object();
     private final ComponentLog logger;
 
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
 
     public void fail(final FlowFile flowFile, final Exception exception) {
         failures.putIfAbsent(flowFile, exception);
-        logger.error("Failed to send " + flowFile + " to Kafka", exception);
+        boolean newException = encounteredFailures
+            .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+            .add(exception);
+        if (newException) {
+            logger.error("Failed to send {} to Kafka", flowFile, exception);
+        }
 
         synchronized (progressMutex) {
             progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
     public void reset() {
         messageCountsByFlowFile.clear();
         failures.clear();
+        encounteredFailures.clear();
     }
 
     public PublishResult failOutstanding(final Exception exception) {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index f93d315db7..92dcf86249 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
 public class InFlightMessageTracker {
     private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<FlowFile, Exception> failures = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures 
= new ConcurrentHashMap<>();
     private final Object progressMutex = new Object();
     private final ComponentLog logger;
 
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
 
     public void fail(final FlowFile flowFile, final Exception exception) {
         failures.putIfAbsent(flowFile, exception);
-        logger.error("Failed to send {} to Kafka", flowFile, exception);
+        boolean newException = encounteredFailures
+            .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+            .add(exception);
+        if (newException) {
+            logger.error("Failed to send {} to Kafka", flowFile, exception);
+        }
 
         synchronized (progressMutex) {
             progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
     public void reset() {
         messageCountsByFlowFile.clear();
         failures.clear();
+        encounteredFailures.clear();
     }
 
     public PublishResult failOutstanding(final Exception exception) {

Reply via email to