NIFI-220: Added error handling that was missing for one instance of calling producer.send, also indicated how many messages were sent per FlowFile in log message and provenance event
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a77fb501 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a77fb501 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a77fb501 Branch: refs/heads/NIFI-250 Commit: a77fb50116642c9692e18046ba42663ea8240087 Parents: 95b22a0 Author: Mark Payne <[email protected]> Authored: Tue Jan 13 19:27:07 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Jan 13 19:27:07 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 28 +++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a77fb501/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 4b5a742..51f9ef1 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -98,15 +98,15 @@ public class PutKafka extends AbstractProcessor { .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Max Buffer Size") .description("The maximum amount of data to buffer in memory before sending to Kafka") @@ -366,7 +366,11 @@ public class PutKafka extends AbstractProcessor { // If there are messages left, send them if ( !messages.isEmpty() ) { - producer.send(messages); + try { + producer.send(messages); + } catch (final Exception e) { + throw new ProcessException("Failed to send messages to Kafka", e); + } } } } @@ -374,9 +378,9 @@ public class PutKafka extends AbstractProcessor { final long nanos = System.nanoTime() - start; - session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); } catch (final ProcessException pe) { error = true;
