Repository: nifi Updated Branches: refs/heads/master dd8c26e35 -> 3d6e66409
NIFI-1672 Improved the Provenance Events emitted by PutKafka This closes #355 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d6e6640 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d6e6640 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d6e6640 Branch: refs/heads/master Commit: 3d6e6640972f62bb748c0c741fc74e1f7835a920 Parents: dd8c26e Author: Pierre Villard <[email protected]> Authored: Fri Apr 15 15:25:28 2016 +0200 Committer: Oleg Zhurakousky <[email protected]> Committed: Wed Apr 20 10:33:03 2016 -0400 ---------------------------------------------------------------------- .../nifi/processors/kafka/KafkaPublisher.java | 28 ++++++++++++++++++-- .../apache/nifi/processors/kafka/PutKafka.java | 25 +++++++++++++---- 2 files changed, 46 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index bcf10a4..afb2cc6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -119,6 +119,30 @@ class KafkaPublisher implements AutoCloseable { */ BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, int maxBufferSize) { + List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize); + return this.publish(sendFutures); + } + + /** + * This method splits (if required) the incoming content stream into + * messages to publish to Kafka topic. See publish method for more + * details + * + * @param messageContext + * instance of {@link SplittableMessageContext} which hold + * context information about the message to be sent + * @param contentStream + * instance of open {@link InputStream} carrying the content of + * the message(s) to be send to Kafka + * @param partitionKey + * the value of the partition key. Only relevant is user wishes + * to provide a custom partition key instead of relying on + * variety of provided {@link Partitioner}(s) + * @param maxBufferSize maximum message size + * @return The list of messages to publish + */ + List<Future<RecordMetadata>> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, + int maxBufferSize) { List<Future<RecordMetadata>> sendFutures = new ArrayList<>(); BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments(); int segmentCounter = 0; @@ -139,13 +163,13 @@ class KafkaPublisher implements AutoCloseable { segmentCounter++; } } - return this.processAcks(sendFutures); + return sendFutures; } /** * */ - private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) { + BitSet publish(List<Future<RecordMetadata>> sendFutures) { int segmentCounter = 0; BitSet failedSegments = new BitSet(); for (Future<RecordMetadata> future : sendFutures) { http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 3b5eb4f..2cf0245 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -30,10 +30,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -54,6 +56,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @@ -159,9 +162,9 @@ public class PutKafka extends AbstractProcessor { + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " - + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " - + "messages that were successfully sent are routed to the 'success' relationship while other messages are " - + "sent to the 'failure' relationship.") + + "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In " + + "case the FlowFile is sent back to this processor, only the messages not previously transferred " + + "successfully will be handled by the processor to be retransferred to Kafka.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -292,19 +295,31 @@ public class PutKafka extends AbstractProcessor { final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session); final Integer partitionKey = this.determinePartition(messageContext, context, flowFile); final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>(); + final List<Future<RecordMetadata>> sendFutures = new ArrayList<>(); + + StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream contentStream) throws IOException { int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue(); - failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize)); + sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize)); + failedSegmentsRef.set(kafkaPublisher.publish(sendFutures)); } }); + timer.stop(); + final long duration = timer.getDuration(TimeUnit.MILLISECONDS); + final int messagesToSend = sendFutures.size(); + final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality(); + final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully"; if (failedSegmentsRef.get().isEmpty()) { - session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName()); + session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session); session.transfer(flowFile, REL_SUCCESS); } else { + if(messagesSent != 0) { + session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); + } flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext)); session.transfer(session.penalize(flowFile), REL_FAILURE); }
