Repository: nifi Updated Branches: refs/heads/master cef7b6c73 -> b729bf4c1
NIFI-1088: Ensure that FlowFile is penalized before routing to failure Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9515b746 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9515b746 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9515b746 Branch: refs/heads/master Commit: 9515b7460713ba985a6d7c8ad033fe2c1ac98e3d Parents: dc4004d Author: Mark Payne <[email protected]> Authored: Fri Oct 30 14:25:27 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Fri Oct 30 14:25:27 2015 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/nifi/processors/kafka/PutKafka.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9515b746/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index cff285c..09025a4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -401,7 +401,7 @@ public class PutKafka extends AbstractProcessor { getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final Exception e) { getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); error = true; } finally { if (error) { @@ -534,7 +534,7 @@ public class PutKafka extends AbstractProcessor { if (offset == 0L) { // all of the messages failed to send. Route FlowFile to failure getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); } else { // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); @@ -545,7 +545,7 @@ public class PutKafka extends AbstractProcessor { messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); session.transfer(successfulMessages, REL_SUCCESS); - session.transfer(failedMessages, REL_FAILURE); + session.transfer(session.penalize(failedMessages), REL_FAILURE); session.remove(flowFile); session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); }
