http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 2cf0245..3f6aec4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -30,25 +29,20 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -56,7 +50,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StopWatch; +import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @@ -67,7 +61,7 @@ import org.apache.nifi.util.StopWatch; + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" + " overriden with warning message describing the override." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") -public class PutKafka extends AbstractProcessor { +public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; @@ -162,9 +156,9 @@ public class PutKafka extends AbstractProcessor { + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " - + "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In " - + "case the FlowFile is sent back to this processor, only the messages not previously transferred " - + "successfully will be handled by the processor to be retransferred to Kafka.") + + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are " + + "sent to the 'failure' relationship.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -199,13 +193,14 @@ public class PutKafka extends AbstractProcessor { .expressionLanguageSupported(false) .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Batch Size").displayName("Batch Size") - .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready " - + "to send or \"Queue Buffering Max Time\" is reached. NOTE: This property will be ignored unless the 'Message Delimiter' " - + "property is specified.") + .name("Async Batch Size") + .displayName("Batch Size") + .description("This configuration controls the default batch size in bytes.The producer will attempt to batch records together into " + + "fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client " + + "and the server.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("200") + .defaultValue("16384") // Kafka default .build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() .name("Queue Buffering Max Time") @@ -236,17 +231,15 @@ public class PutKafka extends AbstractProcessor { .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .build(); - protected static final String ATTR_PROC_ID = "PROC_ID"; + protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; - protected static final String ATTR_FAILED_SEGMENTS = "FS"; + protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; - protected static final String ATTR_TOPIC = "TOPIC"; + protected static final String FAILED_TOPIC_ATTR = "failed.topic"; - protected static final String ATTR_KEY = "KEY"; + protected static final String FAILED_KEY_ATTR = "failed.key"; - protected static final String ATTR_DELIMITER = "DELIMITER"; - - private volatile KafkaPublisher kafkaPublisher; + protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; private static final List<PropertyDescriptor> propertyDescriptors; @@ -276,66 +269,117 @@ public class PutKafka extends AbstractProcessor { relationships = Collections.unmodifiableSet(_relationships); } - /** - * - */ - @OnScheduled - public void createKafkaPublisher(ProcessContext context) { - this.kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); - this.kafkaPublisher.setProcessLog(this.getLogger()); - } /** + * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} + * producing a result {@link FlowFile}. + * <br> + * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} + * <br> + * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} * */ @Override - public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { + boolean processed = false; FlowFile flowFile = session.get(); if (flowFile != null) { - final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session); - final Integer partitionKey = this.determinePartition(messageContext, context, flowFile); - final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>(); - final List<Future<RecordMetadata>> sendFutures = new ArrayList<>(); - - StopWatch timer = new StopWatch(true); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream contentStream) throws IOException { - int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue(); - sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize)); - failedSegmentsRef.set(kafkaPublisher.publish(sendFutures)); - } - }); - timer.stop(); - - final long duration = timer.getDuration(TimeUnit.MILLISECONDS); - final int messagesToSend = sendFutures.size(); - final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality(); - final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully"; - if (failedSegmentsRef.get().isEmpty()) { - session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); - flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session); + flowFile = this.doRendezvousWithKafka(flowFile, context, session); + if (!this.isFailedFlowFile(flowFile)) { + session.getProvenanceReporter().send(flowFile, + context.getProperty(SEED_BROKERS).getValue() + "/" + + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); session.transfer(flowFile, REL_SUCCESS); } else { - if(messagesSent != 0) { - session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); - } - flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext)); session.transfer(session.penalize(flowFile), REL_FAILURE); } + processed = true; + } + return processed; + } + + /** + * Will rendezvous with {@link KafkaPublisher} after building + * {@link PublishingContext} and will produce the resulting {@link FlowFile}. + * The resulting FlowFile contains all required information to determine + * if message publishing originated from the provided FlowFile has actually + * succeeded fully, partially or failed completely (see + * {@link #isFailedFlowFile(FlowFile)}. + */ + private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { + final AtomicReference<KafkaPublisherResult> publishResultRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream contentStream) throws IOException { + PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream); + KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext); + publishResultRef.set(result); + } + }); + + FlowFile resultFile = publishResultRef.get().isAllAcked() + ? this.cleanUpFlowFileIfNecessary(flowFile, session) + : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + + return resultFile; + } + + /** + * Builds {@link PublishingContext} for message(s) to be sent to Kafka. + * {@link PublishingContext} contains all contextual information required by + * {@link KafkaPublisher} to publish to Kafka. Such information contains + * things like topic name, content stream, delimiter, key and last ACKed + * message for cases where provided FlowFile is being retried (failed in the + * past). <br> + * For the clean FlowFile (file that has been sent for the first time), + * PublishingContext will be built form {@link ProcessContext} associated + * with this invocation. <br> + * For the failed FlowFile, {@link PublishingContext} will be built from + * attributes of that FlowFile which by then will already contain required + * information (e.g., topic, key, delimiter etc.). This is required to + * ensure the affinity of the retry in the even where processor + * configuration has changed. However keep in mind that failed FlowFile is + * only considered a failed FlowFile if it is being re-processed by the same + * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see + * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to + * another PublishKafka processor it is treated as a fresh FlowFile + * regardless if it has #FAILED* attributes set. + */ + private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, + InputStream contentStream) { + String topicName; + byte[] keyBytes; + byte[] delimiterBytes = null; + int lastAckedMessageIndex = -1; + if (this.isFailedFlowFile(flowFile)) { + lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); + topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); + keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null + ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; + delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null + ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; } else { - context.yield(); + topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); + delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() ? context.getProperty(MESSAGE_DELIMITER) + .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setKeyBytes(keyBytes); + publishingContext.setDelimiterBytes(delimiterBytes); + publishingContext.setPartitionId(this.determinePartition(context, flowFile)); + return publishingContext; } - @OnStopped - public void cleanup() { - try { - this.kafkaPublisher.close(); - } catch (Exception e) { - getLogger().warn("Failed while closing KafkaPublisher", e); - } + /** + * Returns 'true' if provided FlowFile is a failed FlowFile. A failed + * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. + */ + private boolean isFailedFlowFile(FlowFile flowFile) { + return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); } @Override @@ -344,6 +388,14 @@ public class PutKafka extends AbstractProcessor { } @Override + protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) + throws ProcessException { + KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); + kafkaPublisher.setProcessLog(this.getLogger()); + return kafkaPublisher; + } + + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propertyDescriptors; } @@ -374,12 +426,14 @@ public class PutKafka extends AbstractProcessor { * */ private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { - if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) { - flowFile = session.removeAttribute(flowFile, ATTR_FAILED_SEGMENTS); - flowFile = session.removeAttribute(flowFile, ATTR_KEY); - flowFile = session.removeAttribute(flowFile, ATTR_TOPIC); - flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER); - flowFile = session.removeAttribute(flowFile, ATTR_PROC_ID); + if (this.isFailedFlowFile(flowFile)) { + Set<String> keysToRemove = new HashSet<>(); + keysToRemove.add(FAILED_DELIMITER_ATTR); + keysToRemove.add(FAILED_KEY_ATTR); + keysToRemove.add(FAILED_TOPIC_ATTR); + keysToRemove.add(FAILED_PROC_ID_ATTR); + keysToRemove.add(FAILED_LAST_ACK_IDX); + flowFile = session.removeAllAttributes(flowFile, keysToRemove); } return flowFile; } @@ -387,7 +441,7 @@ public class PutKafka extends AbstractProcessor { /** * */ - private Integer determinePartition(SplittableMessageContext messageContext, ProcessContext context, FlowFile flowFile) { + private Integer determinePartition(ProcessContext context, FlowFile flowFile) { String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); Integer partitionValue = null; if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { @@ -400,49 +454,30 @@ public class PutKafka extends AbstractProcessor { } /** + * Builds a {@link Map} of FAILED_* attributes * + * @see #FAILED_PROC_ID_ATTR + * @see #FAILED_LAST_ACK_IDX + * @see #FAILED_TOPIC_ATTR + * @see #FAILED_KEY_ATTR + * @see #FAILED_DELIMITER_ATTR */ - private Map<String, String> buildFailedFlowFileAttributes(BitSet failedSegments, SplittableMessageContext messageContext) { + private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, + ProcessContext context) { Map<String, String> attributes = new HashMap<>(); - attributes.put(ATTR_PROC_ID, this.getIdentifier()); - attributes.put(ATTR_FAILED_SEGMENTS, new String(failedSegments.toByteArray(), StandardCharsets.UTF_8)); - attributes.put(ATTR_TOPIC, messageContext.getTopicName()); - attributes.put(ATTR_KEY, messageContext.getKeyBytesAsString()); - attributes.put(ATTR_DELIMITER, new String(messageContext.getDelimiterBytes(), StandardCharsets.UTF_8)); + attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); + attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); + attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DELIMITER).isSet() + ? context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(sourceFlowFile).getValue() + : null); return attributes; } /** * */ - private SplittableMessageContext buildMessageContext(FlowFile flowFile, ProcessContext context, ProcessSession session) { - String topicName; - byte[] key; - byte[] delimiterBytes; - - String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS); - if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) { - topicName = flowFile.getAttribute(ATTR_TOPIC); - key = flowFile.getAttribute(ATTR_KEY) == null ? null : flowFile.getAttribute(ATTR_KEY).getBytes(); - delimiterBytes = flowFile.getAttribute(ATTR_DELIMITER) != null ? flowFile.getAttribute(ATTR_DELIMITER).getBytes(StandardCharsets.UTF_8) : null; - } else { - failedSegmentsString = null; - topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - key = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - delimiterBytes = context.getProperty(MESSAGE_DELIMITER).isSet() - ? context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; - } - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, key, delimiterBytes); - if (failedSegmentsString != null) { - messageContext.setFailedSegmentsAsByteArray(failedSegmentsString.getBytes()); - } - return messageContext; - } - - /** - * - */ private Properties buildKafkaConfigProperties(final ProcessContext context) { Properties properties = new Properties(); String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); @@ -450,11 +485,7 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); - if (context.getProperty(MESSAGE_DELIMITER).isSet()) { - properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - } else { - properties.setProperty("batch.size", "1"); - } + properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java deleted file mode 100644 index d5f1c0b..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.nio.charset.StandardCharsets; -import java.util.BitSet; - -import org.apache.nifi.flowfile.FlowFile; - -/** - * Context object that serves as a bridge between the content of a FlowFile and - * Kafka message(s). It contains all necessary information to allow - * {@link KafkaPublisher} to determine how a each content of the - * {@link FlowFile} must be sent to Kafka. - */ -final class SplittableMessageContext { - private final String topicName; - - private final byte[] delimiterBytes; - - private final byte[] keyBytes; - - private volatile BitSet failedSegments; - - /** - * @param topicName - * the name of the Kafka topic - * @param keyBytes - * the instance of byte[] representing the key. Can be null. - * @param delimiterBytes - * byte array representing bytes by which the data will be - * delimited. Can be null. - */ - SplittableMessageContext(String topicName, byte[] keyBytes, byte[] delimiterBytes) { - if (topicName == null || topicName.trim().length() == 0){ - throw new IllegalArgumentException("'topicName' must not be null or empty"); - } - this.topicName = topicName; - this.keyBytes = keyBytes; - this.delimiterBytes = delimiterBytes != null ? delimiterBytes : null; - } - - /** - * - */ - @Override - public String toString() { - String delVal = this.delimiterBytes != null ? " delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'" : ""; - return "topic: '" + topicName + "';" + delVal; - } - - /** - * Will set failed segments from an array of integers - */ - void setFailedSegments(int... failedSegments) { - if (failedSegments != null) { - this.failedSegments = new BitSet(); - for (int failedSegment : failedSegments) { - this.failedSegments.set(failedSegment); - } - } - } - - /** - * Will set failed segments from an array of bytes that will be used to - * construct the final {@link BitSet} representing failed segments - */ - void setFailedSegmentsAsByteArray(byte[] failedSegments) { - if (failedSegments != null) { - this.failedSegments = BitSet.valueOf(failedSegments); - } - } - - /** - * Returns the list of integers representing the segments (chunks) of the - * delimited content stream that had failed to be sent to Kafka topic. - */ - BitSet getFailedSegments() { - return this.failedSegments; - } - - /** - * Returns the name of the Kafka topic - */ - String getTopicName() { - return this.topicName; - } - - /** - * Returns the delimiter bytes - */ - byte[] getDelimiterBytes() { - return this.delimiterBytes; - } - - /** - * Returns the key bytes as String - */ - String getKeyBytesAsString() { - return this.keyBytes != null ? new String(this.keyBytes) : null; - } - - /** - * Returns the key bytes - */ - byte[] getKeyBytes() { - return this.keyBytes; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/GetKafkaIntegrationTests.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/GetKafkaIntegrationTests.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/GetKafkaIntegrationTests.java index 9b2614f..effc8e7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/GetKafkaIntegrationTests.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/GetKafkaIntegrationTests.java @@ -28,8 +28,12 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; +@Ignore +// The test is valid and should be ran when working on this module. @Ignore is +// to speed up the overall build public class GetKafkaIntegrationTests { private static EmbeddedKafka kafkaLocal; http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index 46507d2..c4fc9a8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; import org.junit.AfterClass; @@ -41,6 +43,8 @@ import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +// The test is valid and should be ran when working on this module. @Ignore is +// to speed up the overall build public class KafkaPublisherTest { private static EmbeddedKafka kafkaLocal; @@ -62,17 +66,18 @@ public class KafkaPublisherTest { @Test public void validateSuccessfulSendAsWhole() throws Exception { - InputStream fis = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); + InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + KafkaPublisherResult result = publisher.publish(publishingContext); - publisher.publish(messageContext, fis, null, 2000); - - fis.close(); + assertEquals(0, result.getLastMessageAcked()); + assertEquals(1, result.getMessagesSent()); + contentStream.close(); publisher.close(); ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); @@ -86,16 +91,20 @@ public class KafkaPublisherTest { @Test public void validateSuccessfulSendAsDelimited() throws Exception { - InputStream fis = new ByteArrayInputStream( - "Hello Kafka 1\nHello Kafka 2\nHello Kafka 3\nHello Kafka 4\n".getBytes(StandardCharsets.UTF_8)); + InputStream contentStream = new ByteArrayInputStream( + "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8)); String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n".getBytes(StandardCharsets.UTF_8)); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + KafkaPublisherResult result = publisher.publish(publishingContext); - publisher.publish(messageContext, fis, null, 2000); + assertEquals(3, result.getLastMessageAcked()); + assertEquals(4, result.getMessagesSent()); + contentStream.close(); publisher.close(); ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); @@ -111,48 +120,111 @@ public class KafkaPublisherTest { } } + /* + * This test simulates the condition where not all messages were ACKed by + * Kafka + */ @Test - public void validateSuccessfulReSendOfFailedSegments() throws Exception { - InputStream fis = new ByteArrayInputStream( - "Hello Kafka 1\nHello Kafka 2\nHello Kafka 3\nHello Kafka 4\n".getBytes(StandardCharsets.UTF_8)); + public void validateRetries() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); String topicName = "validateSuccessfulReSendOfFailedSegments"; Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n".getBytes(StandardCharsets.UTF_8)); - messageContext.setFailedSegments(1, 3); + // simulates the first re-try + int lastAckedMessageIndex = 1; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(messageContext, fis, null, 2000); - publisher.close(); + publisher.publish(publishingContext); ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); String m1 = new String(iter.next().message()); String m2 = new String(iter.next().message()); - assertEquals("Hello Kafka 2", m1); - assertEquals("Hello Kafka 4", m2); + assertEquals("Hello Kafka3", m1); + assertEquals("Hello Kafka4", m2); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + // simulates the second re-try + lastAckedMessageIndex = 2; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + + m1 = new String(iter.next().message()); + assertEquals("Hello Kafka4", m1); + + publisher.close(); + } + + /* + * Similar to the above test, but it sets the first retry index to the last + * possible message index and second index to an out of bound index. The + * expectation is that no messages will be sent to Kafka + */ + @Test + public void validateRetriesWithWrongIndex() throws Exception { + byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); + InputStream contentStream = new ByteArrayInputStream(testValue); + String topicName = "validateRetriesWithWrongIndex"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + // simulates the first re-try + int lastAckedMessageIndex = 3; + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + + publisher.publish(publishingContext); + + ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); + try { iter.next(); fail(); } catch (ConsumerTimeoutException e) { // that's OK since this is the Kafka mechanism to unblock } + + // simulates the second re-try + lastAckedMessageIndex = 6; + contentStream = new ByteArrayInputStream(testValue); + publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); + publisher.publish(publishingContext); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + + publisher.close(); } @Test - public void validateWithMultiByteCharacters() throws Exception { + public void validateWithMultiByteCharactersNoDelimiter() throws Exception { String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; - InputStream fis = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); String topicName = "validateWithMultiByteCharacters"; Properties kafkaProperties = this.buildProducerProperties(); KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); - - publisher.publish(messageContext, fis, null, 2000); + publisher.publish(publishingContext); publisher.close(); ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); @@ -162,12 +234,10 @@ public class KafkaPublisherTest { private Properties buildProducerProperties() { Properties kafkaProperties = new Properties(); - kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort()); - kafkaProperties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); - kafkaProperties.setProperty("acks", "1"); + kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName()); + kafkaProperties.setProperty("bootstrap.servers", "localhost:" + kafkaLocal.getKafkaPort()); kafkaProperties.put("auto.create.topics.enable", "true"); - kafkaProperties.setProperty("partitioner.class", "org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner"); - kafkaProperties.setProperty("timeout.ms", "5000"); return kafkaProperties; } http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java new file mode 100644 index 0000000..fbd2963 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +// The test is valid and should be ran when working on this module. @Ignore is +// to speed up the overall build +public class PutKafkaTest { + + private static EmbeddedKafka kafkaLocal; + + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void beforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + @Test + public void validateSingleCharacterDemarcatedMessages() { + String topicName = "validateSingleCharacterDemarcatedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8)); + + runner.shutdown(); + } + + @Test + public void validateMultiCharacterDelimiyedMessages() { + String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "foo"); + + runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8)); + + runner.shutdown(); + } + + @Test + public void validateDemarcationIntoEmptyMessages() { + String topicName = "validateDemarcationIntoEmptyMessages"; + PutKafka putKafka = new PutKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); + runner.enqueue(bytes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + try { + consumer.next(); + fail(); + } catch (Exception e) { + // ignore + } + } + + @Test + public void validateComplexRightPartialDemarcatedMessages() { + String topicName = "validateComplexRightPartialDemarcatedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("I Mean IT!å <å WILDSTUFFå >", new String(consumer.next().message(), StandardCharsets.UTF_8)); + runner.shutdown(); + } + + @Test + public void validateComplexLeftPartialDemarcatedMessages() { + String topicName = "validateComplexLeftPartialDemarcatedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >å <å WILDSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + byte[] message = consumer.next().message(); + assertEquals("Hello World", new String(message, StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("<å WILDSTUFFå >å ", new String(consumer.next().message(), StandardCharsets.UTF_8)); + runner.shutdown(); + } + + @Test + public void validateComplexPartialMatchDemarcatedMessages() { + String topicName = "validateComplexPartialMatchDemarcatedMessages"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); + + runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDBOOMSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbyeå <å WILDBOOMSTUFFå >å ", new String(consumer.next().message(), StandardCharsets.UTF_8)); + runner.shutdown(); + } + + private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "5000"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map<String, Integer> topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); + List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); + ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); + return iter; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java deleted file mode 100644 index 8b5048f..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/SplittableMessageContextTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.nio.charset.StandardCharsets; - -import org.junit.Test; - -public class SplittableMessageContextTest { - - @Test(expected = IllegalArgumentException.class) - public void failNullEmptyTopic() { - new SplittableMessageContext(null, null, null); - } - - @Test - public void validateFullSetting() { - SplittableMessageContext ctx = new SplittableMessageContext("foo", "hello".getBytes(), "\n".getBytes(StandardCharsets.UTF_8)); - ctx.setFailedSegments(1, 3, 6); - assertEquals("\n", new String(ctx.getDelimiterBytes(), StandardCharsets.UTF_8)); - assertEquals("hello", new String(ctx.getKeyBytes(), StandardCharsets.UTF_8)); - assertEquals("foo", ctx.getTopicName()); - assertEquals("topic: 'foo'; delimiter: '\n'", ctx.toString()); - } - - - @Test - public void validateToString() { - SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); - assertEquals("topic: 'foo';", ctx.toString()); - ctx = new SplittableMessageContext("foo", null, "blah".getBytes(StandardCharsets.UTF_8)); - assertEquals("topic: 'foo'; delimiter: 'blah'", ctx.toString()); - } - - @Test - public void validateNoNPEandNoSideffectsOnSetsGets() { - SplittableMessageContext ctx = new SplittableMessageContext("foo", null, null); - ctx.setFailedSegments(null); - assertNull(ctx.getFailedSegments()); - - ctx.setFailedSegmentsAsByteArray(null); - assertNull(ctx.getFailedSegments()); - - assertNull(ctx.getDelimiterBytes()); - assertNull(ctx.getKeyBytes()); - assertNull(ctx.getKeyBytesAsString()); - assertEquals("foo", ctx.getTopicName()); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java deleted file mode 100644 index 5d75d54..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.nio.charset.StandardCharsets; -import java.util.BitSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.nifi.processors.kafka.test.EmbeddedKafka; -import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; - - -public class TestPutKafka { - - private static EmbeddedKafka kafkaLocal; - - private static EmbeddedKafkaProducerHelper producerHelper; - - @BeforeClass - public static void bforeClass() { - kafkaLocal = new EmbeddedKafka(); - kafkaLocal.start(); - producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); - } - - @AfterClass - public static void afterClass() throws Exception { - producerHelper.close(); - kafkaLocal.stop(); - } - - @Test - @Ignore - public void testDelimitedMessagesWithKey() { - String topicName = "testDelimitedMessagesWithKey"; - PutKafka putKafka = new PutKafka(); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("1", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("3", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("4", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("5", new String(consumer.next().message(), StandardCharsets.UTF_8)); - - runner.shutdown(); - } - - @Test - @Ignore - public void testWithFailureAndPartialResend() throws Exception { - String topicName = "testWithFailureAndPartialResend"; - PutKafka putKafka = new PutKafka(); - final TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - final String text = "Hello World\nGoodbye\n1\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - afterClass(); // kill Kafka right before send to ensure producer fails - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); - MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS); - BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes(StandardCharsets.UTF_8)); - assertTrue(fs.get(0)); - assertTrue(fs.get(1)); - assertTrue(fs.get(2)); - assertTrue(fs.get(3)); - String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER); - assertEquals("\n", delimiter); - String key = ff.getAttribute(PutKafka.ATTR_KEY); - assertEquals("key1", key); - String topic = ff.getAttribute(PutKafka.ATTR_TOPIC); - assertEquals(topicName, topic); - - bforeClass(); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - Map<String, String> attr = new HashMap<>(ff.getAttributes()); - /* - * So here we are emulating partial success. Basically even though all 4 - * messages failed to be sent by changing the ATTR_FAILED_SEGMENTS value - * we essentially saying that only two failed and need to be resent. - */ - BitSet _fs = new BitSet(); - _fs.set(1); - _fs.set(3); - attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), StandardCharsets.UTF_8)); - ff.putAttributes(attr); - runner.enqueue(ff); - runner.run(1, false); - MockFlowFile sff = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); - assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS)); - assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC)); - assertNull(sff.getAttribute(PutKafka.ATTR_KEY)); - assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER)); - - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - - assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("2", new String(consumer.next().message(), StandardCharsets.UTF_8)); - try { - consumer.next(); - fail(); - } catch (Exception e) { - // ignore - } - } - - @Test - public void testWithEmptyMessages() { - String topicName = "testWithEmptyMessages"; - PutKafka putKafka = new PutKafka(); - final TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); - runner.enqueue(bytes); - runner.run(1); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - assertNotNull(consumer.next()); - assertNotNull(consumer.next()); - assertNotNull(consumer.next()); - assertNotNull(consumer.next()); - try { - consumer.next(); - fail(); - } catch (Exception e) { - // ignore - } - } - - @Test - public void testComplexRightPartialDelimitedMessages() { - String topicName = "testComplexRightPartialDelimitedMessages"; - PutKafka putKafka = new PutKafka(); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("I Mean IT!å <å WILDSTUFFå >", new String(consumer.next().message(), StandardCharsets.UTF_8)); - runner.shutdown(); - } - - @Test - public void testComplexLeftPartialDelimitedMessages() { - String topicName = "testComplexLeftPartialDelimitedMessages"; - PutKafka putKafka = new PutKafka(); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >å <å WILDSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - byte[] message = consumer.next().message(); - assertEquals("Hello World", new String(message, StandardCharsets.UTF_8)); - assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("I Mean IT!", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("<å WILDSTUFFå >å ", new String(consumer.next().message(), StandardCharsets.UTF_8)); - runner.shutdown(); - } - - @Test - public void testComplexPartialMatchDelimitedMessages() { - String topicName = "testComplexPartialMatchDelimitedMessages"; - PutKafka putKafka = new PutKafka(); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PutKafka.TOPIC, topicName); - runner.setProperty(PutKafka.CLIENT_NAME, "foo"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDBOOMSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName); - assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); - assertEquals("Goodbyeå <å WILDBOOMSTUFFå >å ", new String(consumer.next().message(), StandardCharsets.UTF_8)); - runner.shutdown(); - } - - private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { - Properties props = new Properties(); - props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort()); - props.put("group.id", "test"); - props.put("consumer.timeout.ms", "5000"); - props.put("auto.offset.reset", "smallest"); - ConsumerConfig consumerConfig = new ConsumerConfig(props); - ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); - Map<String, Integer> topicCountMap = new HashMap<>(1); - topicCountMap.put(topic, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); - return iter; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml new file mode 100644 index 0000000..cbe581a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml @@ -0,0 +1,35 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-bundle</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-kafka-pubsub-nar</artifactId> + <packaging>nar</packaging> + <description>NiFi NAR for interacting with Apache Kafka</description> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-pubsub-processors</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000..84b3bb9 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,299 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'Scala Library' under a BSD + style license. + + Copyright (c) 2002-2015 EPFL + Copyright (c) 2011-2015 Typesafe, Inc. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the EPFL nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS âAS ISâ AND ANY EXPRESS + OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'JLine' under a BSD + style license. + + Copyright (c) 2002-2006, Marc Prud'hommeaux <[email protected]> + All rights reserved. + + Redistribution and use in source and binary forms, with or + without modification, are permitted provided that the following + conditions are met: + + Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with + the distribution. + + Neither the name of JLine nor the names of its contributors + may be used to endorse or promote products derived from this + software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, + BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, + OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING + IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + OF THE POSSIBILITY OF SUCH DAMAGE. + + The binary distribution of this product bundles 'JOpt Simple' under an MIT + style license. + + Copyright (c) 2009 Paul R. Holser, Jr. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file
