This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 9a5a56e79e NIFI-12194 Added Yield on Exceptions in Kafka Processors
9a5a56e79e is described below

commit 9a5a56e79eb26f0c6ccf4d7f6cd9a1fef308c2eb
Author: Paul Grey <[email protected]>
AuthorDate: Mon Oct 30 15:15:52 2023 -0400

    NIFI-12194 Added Yield on Exceptions in Kafka Processors
    
    - Catching KafkaException and yielding for publisher lease requests 
improves behavior when the Processor is unable to connect to Kafka Brokers
    
    This closes #7955
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 75c661bbbe56a7951974a701921af9da74dd0d68)
---
 .../processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java     |  6 ++++--
 .../nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java      |  6 ++++--
 .../processors/kafka/pubsub/PublishKafkaRecord_2_6.java     | 13 ++++++++++++-
 .../nifi/processors/kafka/pubsub/PublishKafka_2_6.java      | 13 ++++++++++++-
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index d4d3f56a32..bdf42b8c02 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -546,9 +546,11 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaCl
                 getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka 
so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index b7e3a78338..8bd3398747 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -489,9 +489,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor 
implements KafkaClientCo
                 getLogger().warn("Was interrupted while trying to communicate 
with Kafka with lease {}. "
                     + "Will roll back session and discard any partially 
received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so 
will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so 
will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka 
so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka 
so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index f15a97e98a..05939e338f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -511,7 +512,7 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaPu
         }
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -594,6 +595,16 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements KafkaPu
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final 
PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private Function<Record, Integer> getPartitioner(final ProcessContext 
context, final FlowFile flowFile) {
         final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
         if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index ca1e381acf..acab0fe1cd 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -444,7 +445,7 @@ public class PublishKafka_2_6 extends AbstractProcessor 
implements KafkaPublishC
         final PublishFailureStrategy failureStrategy = 
getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -517,6 +518,16 @@ public class PublishKafka_2_6 extends AbstractProcessor 
implements KafkaPublishC
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final 
PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private PublishFailureStrategy getFailureStrategy(final ProcessContext 
context) {
         final String strategy = 
context.getProperty(FAILURE_STRATEGY).getValue();
         if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {

Reply via email to