This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 75c661bbbe NIFI-12194 Added Yield on Exceptions in Kafka Processors
75c661bbbe is described below

commit 75c661bbbe56a7951974a701921af9da74dd0d68
Author: Paul Grey <[email protected]>
AuthorDate: Mon Oct 30 15:15:52 2023 -0400

    NIFI-12194 Added Yield on Exceptions in Kafka Processors
    
    - Catching KafkaException and yielding for publisher lease requests 
improves behavior when the Processor is unable to connect to Kafka Brokers
    
    This closes #7955
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java     |  6 ++++--
 .../nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java      |  6 ++++--
 .../processors/kafka/pubsub/PublishKafkaRecord_2_6.java     | 13 ++++++++++++-
 .../nifi/processors/kafka/pubsub/PublishKafka_2_6.java      | 13 ++++++++++++-
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 525f621e1f..50fece3b35 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -540,9 +540,11 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaCl
                 getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka 
so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 4421ae92f8..a5c6b15891 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -483,9 +483,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor 
implements KafkaClientCo
                 getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka 
so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index af61faeb95..34053d6a3b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -505,7 +506,7 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaPu
         }
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -588,6 +589,16 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaPu
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final 
PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private Function<Record, Integer> getPartitioner(final ProcessContext 
context, final FlowFile flowFile) {
         final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
         if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index b2721a7199..b6b84ce1e0 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -439,7 +440,7 @@ public class PublishKafka_2_6 extends AbstractProcessor 
implements KafkaPublishC
         final PublishFailureStrategy failureStrategy = 
getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -512,6 +513,16 @@ public class PublishKafka_2_6 extends AbstractProcessor 
implements KafkaPublishC
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final 
PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private PublishFailureStrategy getFailureStrategy(final ProcessContext 
context) {
         final String strategy = 
context.getProperty(FAILURE_STRATEGY).getValue();
         if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {

Reply via email to