This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 9a5a56e79e NIFI-12194 Added Yield on Exceptions in Kafka Processors
9a5a56e79e is described below
commit 9a5a56e79eb26f0c6ccf4d7f6cd9a1fef308c2eb
Author: Paul Grey <[email protected]>
AuthorDate: Mon Oct 30 15:15:52 2023 -0400
NIFI-12194 Added Yield on Exceptions in Kafka Processors
- Catching KafkaException and yielding for publisher lease requests
improves behavior when the Processor is unable to connect to Kafka Brokers
This closes #7955
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 75c661bbbe56a7951974a701921af9da74dd0d68)
---
.../processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java | 6 ++++--
.../nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java | 6 ++++--
.../processors/kafka/pubsub/PublishKafkaRecord_2_6.java | 13 ++++++++++++-
.../nifi/processors/kafka/pubsub/PublishKafka_2_6.java | 13 ++++++++++++-
4 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index d4d3f56a32..bdf42b8c02 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -546,9 +546,11 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor implements KafkaCl
getLogger().warn("Was interrupted while trying to communicate
with Kafka with lease {}. "
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
- getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
+ getLogger().error("Exception while interacting with Kafka so
will close the lease {}", lease, kex);
+ context.yield();
} catch (final Throwable t) {
- getLogger().error("Exception while processing data from kafka
so will close the lease {} due to {}", lease, t, t);
+ getLogger().error("Exception while processing data from kafka
so will close the lease {}", lease, t);
+ context.yield();
} finally {
activeLeases.remove(lease);
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index b7e3a78338..8bd3398747 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -489,9 +489,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor
implements KafkaClientCo
getLogger().warn("Was interrupted while trying to communicate
with Kafka with lease {}. "
+ "Will roll back session and discard any partially
received data.", lease);
} catch (final KafkaException kex) {
- getLogger().error("Exception while interacting with Kafka so
will close the lease {} due to {}", lease, kex, kex);
+ getLogger().error("Exception while interacting with Kafka so
will close the lease {}", lease, kex);
+ context.yield();
} catch (final Throwable t) {
- getLogger().error("Exception while processing data from kafka
so will close the lease {} due to {}", lease, t, t);
+ getLogger().error("Exception while processing data from kafka
so will close the lease {}", lease, t);
+ context.yield();
} finally {
activeLeases.remove(lease);
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index f15a97e98a..05939e338f 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -511,7 +512,7 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements KafkaPu
}
final long startTime = System.nanoTime();
- try (final PublisherLease lease = pool.obtainPublisher()) {
+ try (final PublisherLease lease = obtainPublisher(context, pool)) {
try {
if (useTransactions) {
lease.beginTransaction();
@@ -594,6 +595,16 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements KafkaPu
}
}
+ private PublisherLease obtainPublisher(final ProcessContext context, final
PublisherPool pool) {
+ try {
+ return pool.obtainPublisher();
+ } catch (final KafkaException e) {
+ getLogger().error("Failed to obtain Kafka Producer", e);
+ context.yield();
+ throw e;
+ }
+ }
+
private Function<Record, Integer> getPartitioner(final ProcessContext
context, final FlowFile flowFile) {
final String partitionClass =
context.getProperty(PARTITION_CLASS).getValue();
if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index ca1e381acf..acab0fe1cd 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -18,6 +18,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -444,7 +445,7 @@ public class PublishKafka_2_6 extends AbstractProcessor
implements KafkaPublishC
final PublishFailureStrategy failureStrategy =
getFailureStrategy(context);
final long startTime = System.nanoTime();
- try (final PublisherLease lease = pool.obtainPublisher()) {
+ try (final PublisherLease lease = obtainPublisher(context, pool)) {
try {
if (useTransactions) {
lease.beginTransaction();
@@ -517,6 +518,16 @@ public class PublishKafka_2_6 extends AbstractProcessor
implements KafkaPublishC
}
}
+ private PublisherLease obtainPublisher(final ProcessContext context, final
PublisherPool pool) {
+ try {
+ return pool.obtainPublisher();
+ } catch (final KafkaException e) {
+ getLogger().error("Failed to obtain Kafka Producer", e);
+ context.yield();
+ throw e;
+ }
+ }
+
private PublishFailureStrategy getFailureStrategy(final ProcessContext
context) {
final String strategy =
context.getProperty(FAILURE_STRATEGY).getValue();
if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {