Repository: nifi
Updated Branches:
  refs/heads/master cef7b6c73 -> b729bf4c1


NIFI-1088: Ensure that FlowFile is penalized before routing to failure


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9515b746
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9515b746
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9515b746

Branch: refs/heads/master
Commit: 9515b7460713ba985a6d7c8ad033fe2c1ac98e3d
Parents: dc4004d
Author: Mark Payne <[email protected]>
Authored: Fri Oct 30 14:25:27 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Oct 30 14:25:27 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/kafka/PutKafka.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9515b746/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index cff285c..09025a4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -401,7 +401,7 @@ public class PutKafka extends AbstractProcessor {
                 getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
             } catch (final Exception e) {
                 getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[] { flowFile, e });
-                session.transfer(flowFile, REL_FAILURE);
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
                 error = true;
             } finally {
                 if (error) {
@@ -534,7 +534,7 @@ public class PutKafka extends AbstractProcessor {
                 if (offset == 0L) {
                     // all of the messages failed to send. Route FlowFile to 
failure
                     getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[] { flowFile, pe.getCause() });
-                    session.transfer(flowFile, REL_FAILURE);
+                    session.transfer(session.penalize(flowFile), REL_FAILURE);
                 } else {
                     // Some of the messages were sent successfully. We want to 
split off the successful messages from the failed messages.
                     final FlowFile successfulMessages = 
session.clone(flowFile, 0L, offset);
@@ -545,7 +545,7 @@ public class PutKafka extends AbstractProcessor {
                         messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause() });
 
                     session.transfer(successfulMessages, REL_SUCCESS);
-                    session.transfer(failedMessages, REL_FAILURE);
+                    session.transfer(session.penalize(failedMessages), 
REL_FAILURE);
                     session.remove(flowFile);
                     session.getProvenanceReporter().send(successfulMessages, 
"kafka://" + topic);
                 }

Reply via email to