This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 387ae7d323 NIFI-10199: PublishKafka InFlightMessageTracker should not
log batch level errors on each record
387ae7d323 is described below
commit 387ae7d32318c1cd765c1794aff70f4b80a40b28
Author: Daniel Urban <[email protected]>
AuthorDate: Fri Jul 8 10:39:56 2022 +0200
NIFI-10199: PublishKafka InFlightMessageTracker should not log batch level
errors on each record
Currently, when a batch level error occurs (e.g. delivery timeout), the
same error is logged for each record of the batch, needlessly flooding the logs.
InFlightMessageTracker now only logs an exception when a flow file
encounters it for the first time.
This closes #6185
Signed-off-by: Paul Grey <[email protected]>
---
.../nifi/processors/kafka/pubsub/InFlightMessageTracker.java | 10 +++++++++-
.../nifi/processors/kafka/pubsub/InFlightMessageTracker.java | 10 +++++++++-
.../nifi/processors/kafka/pubsub/InFlightMessageTracker.java | 10 +++++++++-
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 3ec3d1c53f..92dcf86249 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile =
new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures
= new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
- logger.error("Failed to send " + flowFile + " to Kafka", exception);
+ boolean newException = encounteredFailures
+ .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+ .add(exception);
+ if (newException) {
+ logger.error("Failed to send {} to Kafka", flowFile, exception);
+ }
synchronized (progressMutex) {
progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
+ encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 3ec3d1c53f..92dcf86249 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile =
new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures
= new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
- logger.error("Failed to send " + flowFile + " to Kafka", exception);
+ boolean newException = encounteredFailures
+ .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+ .add(exception);
+ if (newException) {
+ logger.error("Failed to send {} to Kafka", flowFile, exception);
+ }
synchronized (progressMutex) {
progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
+ encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index f93d315db7..92dcf86249 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile =
new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures
= new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
- logger.error("Failed to send {} to Kafka", flowFile, exception);
+ boolean newException = encounteredFailures
+ .computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
+ .add(exception);
+ if (newException) {
+ logger.error("Failed to send {} to Kafka", flowFile, exception);
+ }
synchronized (progressMutex) {
progressMutex.notify();
@@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
+ encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {