NIFI-220: Added error handling that was missing for one instance of calling 
producer.send, also indicated how many messages were sent per FlowFile in log 
message and provenance event


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a77fb501
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a77fb501
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a77fb501

Branch: refs/heads/NIFI-250
Commit: a77fb50116642c9692e18046ba42663ea8240087
Parents: 95b22a0
Author: Mark Payne <[email protected]>
Authored: Tue Jan 13 19:27:07 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Jan 13 19:27:07 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a77fb501/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 4b5a742..51f9ef1 100644
--- 
a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -98,15 +98,15 @@ public class PutKafka extends AbstractProcessor {
                .defaultValue(DELIVERY_BEST_EFFORT.getValue())
                .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
-                    + "If not specified, the entire content of the FlowFile 
will be used as a single message. "
-                    + "If specified, the contents of the FlowFile will be 
split on this delimiter and each section "
-                    + "sent as a separate Kafka message.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Message Delimiter")
+        .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
+                + "If not specified, the entire content of the FlowFile will 
be used as a single message. "
+                + "If specified, the contents of the FlowFile will be split on 
this delimiter and each section "
+                + "sent as a separate Kafka message.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
         .name("Max Buffer Size")
         .description("The maximum amount of data to buffer in memory before 
sending to Kafka")
@@ -366,7 +366,11 @@ public class PutKafka extends AbstractProcessor {
 
                             // If there are messages left, send them
                             if ( !messages.isEmpty() ) {
-                                producer.send(messages);
+                                try {
+                                    producer.send(messages);
+                                } catch (final Exception e) {
+                                    throw new ProcessException("Failed to send 
messages to Kafka", e);
+                                }
                             }
                         }
                     }
@@ -374,9 +378,9 @@ public class PutKafka extends AbstractProcessor {
                 
                 final long nanos = System.nanoTime() - start;
                 
-                session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic);
+                session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] {messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});
             } catch (final ProcessException pe) {
                 error = true;
                 

Reply via email to