Repository: incubator-nifi
Updated Branches:
  refs/heads/develop b2a1f5217 -> 8f2502c4e


NIFI-271


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8f2502c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8f2502c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8f2502c4

Branch: refs/heads/develop
Commit: 8f2502c4e476ba3ce00943d9d9f777b0637c1114
Parents: b2a1f52
Author: joewitt <[email protected]>
Authored: Sat Apr 25 09:20:35 2015 -0400
Committer: joewitt <[email protected]>
Committed: Sat Apr 25 09:20:35 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 438 ++++++++++---------
 .../apache/nifi/processors/kafka/PutKafka.java  | 285 ++++++------
 .../additionalDetails.html                      |  28 +-
 .../additionalDetails.html                      |  30 +-
 .../nifi/processors/kafka/TestGetKafka.java     |  58 ++-
 .../nifi/processors/kafka/TestPutKafka.java     | 189 ++++----
 nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml |  14 +-
 7 files changed, 525 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 63a816e..1b63a46 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -61,97 +61,101 @@ import org.apache.nifi.processor.util.StandardValidators;
 @SupportsBatching
 @CapabilityDescription("Fetches messages from Apache Kafka")
 @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
-@WritesAttributes({ @WritesAttribute(attribute = "kafka.topic", description = 
"The name of the Kafka Topic from which the message was received"),
-        @WritesAttribute(attribute = "kafka.key", description = "The key of 
the Kafka message, if it exists and batch size is 1. If the message does not 
have a key, or if the batch size is greater than 1, this attribute will not be 
added"),
-        @WritesAttribute(attribute = "kafka.partition", description = "The 
partition of the Kafka Topic from which the message was received. This 
attribute is added only if the batch size is 1"),
-        @WritesAttribute(attribute = "kafka.offset", description = "The offset 
of the message within the Kafka partition. This attribute is added only if the 
batch size is 1") })
+@WritesAttributes({
+    @WritesAttribute(attribute = "kafka.topic", description = "The name of the 
Kafka Topic from which the message was received"),
+    @WritesAttribute(attribute = "kafka.key", description = "The key of the 
Kafka message, if it exists and batch size is 1. If"
+            + " the message does not have a key, or if the batch size is 
greater than 1, this attribute will not be added"),
+    @WritesAttribute(attribute = "kafka.partition", description = "The 
partition of the Kafka Topic from which the message was received. This 
attribute is added only if the batch size is 1"),
+    @WritesAttribute(attribute = "kafka.offset", description = "The offset of 
the message within the Kafka partition. This attribute is added only if the 
batch size is 1")})
 public class GetKafka extends AbstractProcessor {
+
     public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
-        .name("ZooKeeper Connection String")
-        .description("The Connection String to use in order to connect to 
ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. 
For example, host1:2181,host2:2181,host3:2188")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("ZooKeeper Connection String")
+            .description("The Connection String to use in order to connect to 
ZooKeeper. This is often a comma-separated list of <host>:<port>"
+                    + " combinations. For example, 
host1:2181,host2:2181,host3:2188")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
     public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
-        .name("Topic Name")
-        .description("The Kafka Topic to pull messages from")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Topic Name")
+            .description("The Kafka Topic to pull messages from")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
     public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new 
PropertyDescriptor.Builder()
-               .name("Zookeeper Commit Frequency")
-               .description("Specifies how often to communicate with ZooKeeper 
to indicate which messages have been pulled. A longer time period will result 
in better overall performance but can result in more data duplication if a NiFi 
node is lost")
-               .required(true)
-               .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-               .expressionLanguageSupported(false)
-               .defaultValue("60 secs")
-               .build();
+            .name("Zookeeper Commit Frequency")
+            .description("Specifies how often to communicate with ZooKeeper to 
indicate which messages have been pulled. A longer time period will"
+                    + " result in better overall performance but can result in 
more data duplication if a NiFi node is lost")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("60 secs")
+            .build();
     public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new 
PropertyDescriptor.Builder()
-           .name("ZooKeeper Communications Timeout")
-           .description("The amount of time to wait for a response from 
ZooKeeper before determining that there is a communications error")
-           .required(true)
-           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-           .expressionLanguageSupported(false)
-           .defaultValue("30 secs")
-           .build();
+            .name("ZooKeeper Communications Timeout")
+            .description("The amount of time to wait for a response from 
ZooKeeper before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs")
+            .build();
     public static final PropertyDescriptor KAFKA_TIMEOUT = new 
PropertyDescriptor.Builder()
-           .name("Kafka Communications Timeout")
-           .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
-           .required(true)
-           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-           .expressionLanguageSupported(false)
-           .defaultValue("30 secs")
-           .build();
+            .name("Kafka Communications Timeout")
+            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs")
+            .build();
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-        .name("Batch Size")
-        .description("Specifies the maximum number of messages to combine into 
a single FlowFile. These messages will be "
-                + "concatenated together with the <Message Demarcator> string 
placed between the content of each message. "
-                + "If the messages from Kafka should not be concatenated 
together, leave this value at 1.")
-        .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .defaultValue("1")
-        .build();
+            .name("Batch Size")
+            .description("Specifies the maximum number of messages to combine 
into a single FlowFile. These messages will be "
+                    + "concatenated together with the <Message Demarcator> 
string placed between the content of each message. "
+                    + "If the messages from Kafka should not be concatenated 
together, leave this value at 1.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("1")
+            .build();
     public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
-        .name("Message Demarcator")
-        .description("Specifies the characters to use in order to demarcate 
multiple messages from Kafka. If the <Batch Size> "
-                + "property is set to 1, this value is ignored. Otherwise, for 
each two subsequent messages in the batch, "
-                + "this value will be placed in between them.")
-        .required(true)
-        .addValidator(Validator.VALID)  // accept anything as a demarcator, 
including empty string
-        .expressionLanguageSupported(false)
-        .defaultValue("\\n")
-        .build();
+            .name("Message Demarcator")
+            .description("Specifies the characters to use in order to 
demarcate multiple messages from Kafka. If the <Batch Size> "
+                    + "property is set to 1, this value is ignored. Otherwise, 
for each two subsequent messages in the batch, "
+                    + "this value will be placed in between them.")
+            .required(true)
+            .addValidator(Validator.VALID) // accept anything as a demarcator, 
including empty string
+            .expressionLanguageSupported(false)
+            .defaultValue("\\n")
+            .build();
     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
-        .name("Client Name")
-        .description("Client Name to use when communicating with Kafka")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Client Name")
+            .description("Client Name to use when communicating with Kafka")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-           .name("success")
-           .description("All FlowFiles that are created are routed to this 
relationship")
-           .build();
+            .name("success")
+            .description("All FlowFiles that are created are routed to this 
relationship")
+            .build();
 
-    
     private final BlockingQueue<ConsumerIterator<byte[], byte[]>> 
streamIterators = new LinkedBlockingQueue<>();
     private volatile ConsumerConnector consumer;
 
     final Lock interruptionLock = new ReentrantLock();
     // guarded by interruptionLock
     private final Set<Thread> interruptableThreads = new HashSet<>();
-    
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-       final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
-               .fromPropertyDescriptor(CLIENT_NAME)
-               .defaultValue("NiFi-" + getIdentifier())
-               .build();
-       
+        final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(CLIENT_NAME)
+                .defaultValue("NiFi-" + getIdentifier())
+                .build();
+
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(ZOOKEEPER_CONNECTION_STRING);
         props.add(TOPIC);
@@ -163,174 +167,174 @@ public class GetKafka extends AbstractProcessor {
         props.add(ZOOKEEPER_TIMEOUT);
         return props;
     }
-    
+
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> relationships = new HashSet<>(1);
         relationships.add(REL_SUCCESS);
         return relationships;
     }
-    
+
     @OnScheduled
     public void createConsumers(final ProcessContext context) {
-       final String topic = context.getProperty(TOPIC).getValue();
-       
-       final Map<String, Integer> topicCountMap = new HashMap<>(1);
-       topicCountMap.put(topic, context.getMaxConcurrentTasks());
-       
-       final Properties props = new Properties();
-       props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); 
-       props.setProperty("group.id", getIdentifier());
-       props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
-       props.setProperty("auto.commit.enable", "true"); // just be explicit
-       props.setProperty("auto.offset.reset", "smallest");
-       
-       final ConsumerConfig consumerConfig = new ConsumerConfig(props);
-       consumer = Consumer.createJavaConsumerConnector(consumerConfig);
-       
-       final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
-       final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
-       
-       this.streamIterators.clear();
-       
-       for ( final KafkaStream<byte[], byte[]> stream : streams ) {
-               streamIterators.add(stream.iterator());
-       }
+        final String topic = context.getProperty(TOPIC).getValue();
+
+        final Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, context.getMaxConcurrentTasks());
+
+        final Properties props = new Properties();
+        props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
+        props.setProperty("group.id", getIdentifier());
+        props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+        props.setProperty("auto.commit.enable", "true"); // just be explicit
+        props.setProperty("auto.offset.reset", "smallest");
+
+        final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+
+        final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
+
+        this.streamIterators.clear();
+
+        for (final KafkaStream<byte[], byte[]> stream : streams) {
+            streamIterators.add(stream.iterator());
+        }
     }
-    
+
     @OnStopped
     public void shutdownConsumer() {
-       if ( consumer != null ) {
-               try {
-                       consumer.commitOffsets();
-               } finally {
-                       consumer.shutdown();
-               }
-       }
+        if (consumer != null) {
+            try {
+                consumer.commitOffsets();
+            } finally {
+                consumer.shutdown();
+            }
+        }
     }
-    
+
     @OnUnscheduled
     public void interruptIterators() {
-       // Kafka doesn't provide a non-blocking API for pulling messages. We 
can, however,
-       // interrupt the Threads. We do this when the Processor is stopped so 
that we have the
-       // ability to shutdown the Processor.
-       interruptionLock.lock();
-       try {
-               for ( final Thread t : interruptableThreads ) {
-                       t.interrupt();
-               }
-               
-               interruptableThreads.clear();
-       } finally {
-               interruptionLock.unlock();
-       }
+        // Kafka doesn't provide a non-blocking API for pulling messages. We 
can, however,
+        // interrupt the Threads. We do this when the Processor is stopped so 
that we have the
+        // ability to shutdown the Processor.
+        interruptionLock.lock();
+        try {
+            for (final Thread t : interruptableThreads) {
+                t.interrupt();
+            }
+
+            interruptableThreads.clear();
+        } finally {
+            interruptionLock.unlock();
+        }
     }
-    
+
     protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
         return streamIterators.poll();
     }
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-       ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
-       if ( iterator == null ) {
-               return;
-       }
-       
-       final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-       final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
-       final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
-       final String topic = context.getProperty(TOPIC).getValue();
-       
-       FlowFile flowFile = null;
-       try {
-           // add the current thread to the Set of those to be interrupted if 
processor stopped.
-               interruptionLock.lock();
-               try {
-                       interruptableThreads.add(Thread.currentThread());
-               } finally {
-                       interruptionLock.unlock();
-               }
-               
-               final long start = System.nanoTime();
-               flowFile = session.create();
-               
-               final Map<String, String> attributes = new HashMap<>();
+        ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
+        if (iterator == null) {
+            return;
+        }
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
+        final String topic = context.getProperty(TOPIC).getValue();
+
+        FlowFile flowFile = null;
+        try {
+            // add the current thread to the Set of those to be interrupted if 
processor stopped.
+            interruptionLock.lock();
+            try {
+                interruptableThreads.add(Thread.currentThread());
+            } finally {
+                interruptionLock.unlock();
+            }
+
+            final long start = System.nanoTime();
+            flowFile = session.create();
+
+            final Map<String, String> attributes = new HashMap<>();
             attributes.put("kafka.topic", topic);
 
             int numMessages = 0;
-               for (int msgCount = 0; msgCount < batchSize; msgCount++) {
-                   // if the processor is stopped, iterator.hasNext() will 
throw an Exception.
-                   // In this case, we just break out of the loop.
-                   try {
-                           if ( !iterator.hasNext() ) {
-                               break;
-                           }
-                   } catch (final Exception e) {
-                       break;
-                   }
-                   
-                       final MessageAndMetadata<byte[], byte[]> mam = 
iterator.next();
-                       if ( mam == null ) {
-                               return;
-                       }
-                       
-                       final byte[] key = mam.key();
-                       
-                       if ( batchSize == 1 ) {
-                           // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
-                           // for a batch size of 1.
-                           if ( key != null ) {
-                               attributes.put("kafka.key", new String(key, 
StandardCharsets.UTF_8));
-                           }
-                           
-                       attributes.put("kafka.offset", 
String.valueOf(mam.offset()));
-                       attributes.put("kafka.partition", 
String.valueOf(mam.partition()));
-                       }
-                       
-                       // add the message to the FlowFile's contents
-                       final boolean firstMessage = (msgCount == 0);
-                       flowFile = session.append(flowFile, new 
OutputStreamCallback() {
-                               @Override
-                               public void process(final OutputStream out) 
throws IOException {
-                                   if ( !firstMessage ) {
-                                       out.write(demarcatorBytes);
-                                   }
-                                       out.write(mam.message());
-                               }
-                       });
-                       numMessages++;
-               }
-               
-               // If we received no messages, remove the FlowFile. Otherwise, 
send to success.
-               if ( flowFile.getSize() == 0L ) {
-                   session.remove(flowFile);
-               } else {
-                       flowFile = session.putAllAttributes(flowFile, 
attributes);
-                       final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                       session.getProvenanceReporter().receive(flowFile, 
"kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
-                       getLogger().info("Successfully received {} from Kafka 
with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
-                       session.transfer(flowFile, REL_SUCCESS);
-               }
-       } catch (final Exception e) {
-               getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[] {e});
-               if ( flowFile != null ) {
-                       session.remove(flowFile);
-               }
-       } finally {
-           // Remove the current thread from the Set of Threads to interrupt.
-               interruptionLock.lock();
-               try {
-                       interruptableThreads.remove(Thread.currentThread());
-               } finally {
-                       interruptionLock.unlock();
-               }
-               
-               // Add the iterator back to the queue
-               if ( iterator != null ) {
-                       streamIterators.offer(iterator);
-               }
-       }
+            for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+                // if the processor is stopped, iterator.hasNext() will throw 
an Exception.
+                // In this case, we just break out of the loop.
+                try {
+                    if (!iterator.hasNext()) {
+                        break;
+                    }
+                } catch (final Exception e) {
+                    break;
+                }
+
+                final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+                if (mam == null) {
+                    return;
+                }
+
+                final byte[] key = mam.key();
+
+                if (batchSize == 1) {
+                    // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
+                    // for a batch size of 1.
+                    if (key != null) {
+                        attributes.put("kafka.key", new String(key, 
StandardCharsets.UTF_8));
+                    }
+
+                    attributes.put("kafka.offset", 
String.valueOf(mam.offset()));
+                    attributes.put("kafka.partition", 
String.valueOf(mam.partition()));
+                }
+
+                // add the message to the FlowFile's contents
+                final boolean firstMessage = (msgCount == 0);
+                flowFile = session.append(flowFile, new OutputStreamCallback() 
{
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        if (!firstMessage) {
+                            out.write(demarcatorBytes);
+                        }
+                        out.write(mam.message());
+                    }
+                });
+                numMessages++;
+            }
+
+            // If we received no messages, remove the FlowFile. Otherwise, 
send to success.
+            if (flowFile.getSize() == 0L) {
+                session.remove(flowFile);
+            } else {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                session.getProvenanceReporter().receive(flowFile, "kafka://" + 
topic, "Received " + numMessages + " Kafka messages", millis);
+                getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[]{flowFile, numMessages, millis});
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[]{e});
+            if (flowFile != null) {
+                session.remove(flowFile);
+            }
+        } finally {
+            // Remove the current thread from the Set of Threads to interrupt.
+            interruptionLock.lock();
+            try {
+                interruptableThreads.remove(Thread.currentThread());
+            } finally {
+                interruptionLock.unlock();
+            }
+
+            // Add the iterator back to the queue
+            if (iterator != null) {
+                streamIterators.offer(iterator);
+            }
+        }
     }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index e0b7588..44b6584 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -61,95 +61,100 @@ import scala.actors.threadpool.Arrays;
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka")
 public class PutKafka extends AbstractProcessor {
+
     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
-    
-    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed 
to failure unless the message is replicated to the appropriate number of Kafka 
Nodes according to the Topic configuration");
-    public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed 
to success if the message is received by a single Kafka node, whether or not it 
is replicated. This is faster than <Guarantee Replicated Delivery> but can 
result in data loss if a Kafka node crashes");
-    public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort", "FlowFile will be routed to success after 
successfully writing the content to a Kafka node, without waiting for a 
response. This provides the best performance but may result in data loss.");
-    
+
+    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed 
to"
+            + " failure unless the message is replicated to the appropriate 
number of Kafka Nodes according to the Topic configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+            + " to success if the message is received by a single Kafka node, 
whether or not it is replicated. This is faster than"
+            + " <Guarantee Replicated Delivery> but can result in data loss if 
a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
+            + " successfully writing the content to a Kafka node, without 
waiting for a response. This provides the best performance but may result"
+            + " in data loss.");
+
     public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
-        .name("Known Brokers")
-        .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
-        .required(true)
-        
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Known Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+            .required(true)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+            .expressionLanguageSupported(false)
+            .build();
     public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
-           .name("Topic Name")
-           .description("The Kafka Topic of interest")
-           .required(true)
-           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-           .expressionLanguageSupported(true)
-           .build();
+            .name("Topic Name")
+            .description("The Kafka Topic of interest")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
-               .name("Kafka Key")
-               .description("The Key to use for the Message")
-               .required(false)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .expressionLanguageSupported(true)
-               .build();
+            .name("Kafka Key")
+            .description("The Key to use for the Message")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
-               .name("Delivery Guarantee")
-               .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
-               .required(true)
-               .expressionLanguageSupported(false)
-               .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
-               .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-               .build();
+            .name("Delivery Guarantee")
+            .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+            .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
-        .name("Message Delimiter")
-        .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
-                + "If not specified, the entire content of the FlowFile will 
be used as a single message. "
-                + "If specified, the contents of the FlowFile will be split on 
this delimiter and each section "
-                + "sent as a separate Kafka message.")
-        .required(false)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
+                    + "If not specified, the entire content of the FlowFile 
will be used as a single message. "
+                    + "If specified, the contents of the FlowFile will be 
split on this delimiter and each section "
+                    + "sent as a separate Kafka message.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-        .name("Max Buffer Size")
-        .description("The maximum amount of data to buffer in memory before 
sending to Kafka")
-        .required(true)
-        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .defaultValue("1 MB")
-        .build();
+            .name("Max Buffer Size")
+            .description("The maximum amount of data to buffer in memory 
before sending to Kafka")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("1 MB")
+            .build();
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
-           .name("Communications Timeout")
-           .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
-           .required(true)
-           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-           .expressionLanguageSupported(false)
-           .defaultValue("30 secs")
-           .build();
+            .name("Communications Timeout")
+            .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs")
+            .build();
     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
-           .name("Client Name")
-           .description("Client Name to use when communicating with Kafka")
-           .required(true)
-           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-           .expressionLanguageSupported(false)
-           .build();
-
-    
+            .name("Client Name")
+            .description("Client Name to use when communicating with Kafka")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-           .name("success")
-           .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
-           .build();
+            .name("success")
+            .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
+            .build();
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-           .name("failure")
-           .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
-           .build();
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+            .build();
 
     private final BlockingQueue<Producer<byte[], byte[]>> producers = new 
LinkedBlockingQueue<>();
-    
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-       final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
-               .fromPropertyDescriptor(CLIENT_NAME)
-               .defaultValue("NiFi-" + getIdentifier())
-               .build();
-       
+        final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(CLIENT_NAME)
+                .defaultValue("NiFi-" + getIdentifier())
+                .build();
+
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(SEED_BROKERS);
         props.add(TOPIC);
@@ -161,7 +166,7 @@ public class PutKafka extends AbstractProcessor {
         props.add(clientName);
         return props;
     }
-    
+
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> relationships = new HashSet<>(1);
@@ -169,17 +174,16 @@ public class PutKafka extends AbstractProcessor {
         relationships.add(REL_FAILURE);
         return relationships;
     }
-    
-    
+
     @OnStopped
     public void closeProducers() {
-       Producer<byte[], byte[]> producer;
-       
-       while ((producer = producers.poll()) != null) {
-               producer.close();
-       }
+        Producer<byte[], byte[]> producer;
+
+        while ((producer = producers.poll()) != null) {
+            producer.close();
+        }
     }
-    
+
     protected ProducerConfig createConfig(final ProcessContext context) {
         final String brokers = context.getProperty(SEED_BROKERS).getValue();
 
@@ -188,76 +192,76 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("request.required.acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
         properties.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
         properties.setProperty("request.timeout.ms", 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
-        
+
         properties.setProperty("message.send.max.retries", "1");
         properties.setProperty("producer.type", "sync");
-        
+
         return new ProducerConfig(properties);
     }
-    
+
     protected Producer<byte[], byte[]> createProducer(final ProcessContext 
context) {
-       return new Producer<>(createConfig(context));
+        return new Producer<>(createConfig(context));
     }
-    
+
     private Producer<byte[], byte[]> borrowProducer(final ProcessContext 
context) {
-       Producer<byte[], byte[]> producer = producers.poll();
-       return producer == null ? createProducer(context) : producer;
+        Producer<byte[], byte[]> producer = producers.poll();
+        return producer == null ? createProducer(context) : producer;
     }
-    
+
     private void returnProducer(final Producer<byte[], byte[]> producer) {
-       producers.offer(producer);
+        producers.offer(producer);
     }
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-       FlowFile flowFile = session.get();
-       if ( flowFile == null ) {
-               return;
-       }
-       
-       final long start = System.nanoTime();
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long start = System.nanoTime();
         final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         final byte[] keyBytes = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
         String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
-        if ( delimiter != null ) {
+        if (delimiter != null) {
             delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
         }
-        
+
         final long maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
         final Producer<byte[], byte[]> producer = borrowProducer(context);
-        
-        if ( delimiter == null ) {
+
+        if (delimiter == null) {
             // Send the entire FlowFile as a single message.
             final byte[] value = new byte[(int) flowFile.getSize()];
             session.read(flowFile, new InputStreamCallback() {
-                       @Override
-                       public void process(final InputStream in) throws 
IOException {
-                               StreamUtils.fillBuffer(in, value);
-                       }
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, value);
+                }
             });
-            
+
             boolean error = false;
             try {
                 final KeyedMessage<byte[], byte[]> message;
-                if ( key == null ) {
+                if (key == null) {
                     message = new KeyedMessage<>(topic, value);
                 } else {
                     message = new KeyedMessage<>(topic, keyBytes, value);
                 }
-                
+
                 producer.send(message);
                 final long nanos = System.nanoTime() - start;
-                
+
                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic);
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+                getLogger().info("Successfully sent {} to Kafka in {} millis", 
new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
             } catch (final Exception e) {
-                getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[] {flowFile, e});
+                getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[]{flowFile, e});
                 session.transfer(flowFile, REL_FAILURE);
                 error = true;
             } finally {
-                if ( error ) {
+                if (error) {
                     producer.close();
                 } else {
                     returnProducer(producer);
@@ -265,53 +269,53 @@ public class PutKafka extends AbstractProcessor {
             }
         } else {
             final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
-            
+
             // The NonThreadSafeCircularBuffer allows us to add a byte from 
the stream one at a time and see
             // if it matches some pattern. We can use this to search for the 
delimiter as we read through
             // the stream of bytes in the FlowFile
             final NonThreadSafeCircularBuffer buffer = new 
NonThreadSafeCircularBuffer(delimiterBytes);
-            
+
             boolean error = false;
             final LongHolder lastMessageOffset = new LongHolder(0L);
             final LongHolder messagesSent = new LongHolder(0L);
-            
+
             try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
                 session.read(flowFile, new InputStreamCallback() {
                     @Override
                     public void process(final InputStream rawIn) throws 
IOException {
                         byte[] data = null; // contents of a single message
-                        
+
                         boolean streamFinished = false;
-                        
+
                         final List<KeyedMessage<byte[], byte[]>> messages = 
new ArrayList<>(); // batch to send
                         long messageBytes = 0L; // size of messages in the 
'messages' list
-                        
+
                         int nextByte;
                         try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
-                             final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
-                            
+                                final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
+
                             // read until we're out of data.
                             while (!streamFinished) {
                                 nextByte = in.read();
 
-                                if ( nextByte > -1 ) {
+                                if (nextByte > -1) {
                                     baos.write(nextByte);
                                 }
-                                
+
                                 if (nextByte == -1) {
                                     // we ran out of data. This message is 
complete.
                                     data = baos.toByteArray();
                                     streamFinished = true;
-                                } else if ( buffer.addAndCompare((byte) 
nextByte) ) {
+                                } else if (buffer.addAndCompare((byte) 
nextByte)) {
                                     // we matched our delimiter. This message 
is complete. We want all of the bytes from the
                                     // underlying BAOS exception for the last 
'delimiterBytes.length' bytes because we don't want
                                     // the delimiter itself to be sent.
                                     data = 
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - 
delimiterBytes.length);
                                 }
-                                
-                                if ( data != null ) {
+
+                                if (data != null) {
                                     // If the message has no data, ignore it.
-                                    if ( data.length != 0 ) {
+                                    if (data.length != 0) {
                                         // either we ran out of data or we 
reached the end of the message.
                                         // Either way, create the message 
because it's ready to send.
                                         final KeyedMessage<byte[], byte[]> 
message;
@@ -361,7 +365,7 @@ public class PutKafka extends AbstractProcessor {
                             }
 
                             // If there are messages left, send them
-                            if ( !messages.isEmpty() ) {
+                            if (!messages.isEmpty()) {
                                 try {
                                     messagesSent.addAndGet(messages.size());   
 // add count of messages
                                     producer.send(messages);
@@ -372,44 +376,45 @@ public class PutKafka extends AbstractProcessor {
                         }
                     }
                 });
-                
+
                 final long nanos = System.nanoTime() - start;
                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] {messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});
+                getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[]{messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});
             } catch (final ProcessException pe) {
                 error = true;
-                
+
                 // There was a failure sending messages to Kafka. Iff the 
lastMessageOffset is 0, then all of them failed and we can
                 // just route the FlowFile to failure. Otherwise, some 
messages were successful, so split them off and send them to
                 // 'success' while we send the others to 'failure'.
                 final long offset = lastMessageOffset.get();
-                if ( offset == 0L ) {
+                if (offset == 0L) {
                     // all of the messages failed to send. Route FlowFile to 
failure
-                    getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[] {flowFile, pe.getCause()});
+                    getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[]{flowFile, pe.getCause()});
                     session.transfer(flowFile, REL_FAILURE);
                 } else {
                     // Some of the messages were sent successfully. We want to 
split off the successful messages from the failed messages.
                     final FlowFile successfulMessages = 
session.clone(flowFile, 0L, offset);
                     final FlowFile failedMessages = session.clone(flowFile, 
offset, flowFile.getSize() - offset);
-                    
-                    getLogger().error("Successfully sent {} of the messages 
from {} but then failed to send the rest. Original FlowFile split into two: {} 
routed to 'success', {} routed to 'failure'. Failure was due to {}", new 
Object[] {
-                         messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause() });
-                    
+
+                    getLogger().error("Successfully sent {} of the messages 
from {} but then failed to send the rest. Original FlowFile split into"
+                            + " two: {} routed to 'success', {} routed to 
'failure'. Failure was due to {}", new Object[]{
+                        messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause()});
+
                     session.transfer(successfulMessages, REL_SUCCESS);
                     session.transfer(failedMessages, REL_FAILURE);
                     session.remove(flowFile);
                     session.getProvenanceReporter().send(successfulMessages, 
"kafka://" + topic);
                 }
             } finally {
-                if ( error ) {
+                if (error) {
                     producer.close();
                 } else {
                     returnProducer(producer);
                 }
             }
-            
+
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
index 60611b6..10c7082 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html
@@ -24,22 +24,22 @@
         <!-- Processor Documentation 
================================================== -->
         <h2>Description:</h2>
         <p>
-               This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
-               for data. When a message is received from Kafka, this Processor 
emits a FlowFile
-               where the content of the FlowFile is the value of the Kafka 
message. If the
-               message has a key associated with it, an attribute named 
<code>kafka.key</code>
-               will be added to the FlowFile, with the value being the UTF-8 
Encoded value
-               of the Message's Key.
+            This Processors polls <a href="http://kafka.apache.org/";>Apache 
Kafka</a>
+            for data. When a message is received from Kafka, this Processor 
emits a FlowFile
+            where the content of the FlowFile is the value of the Kafka 
message. If the
+            message has a key associated with it, an attribute named 
<code>kafka.key</code>
+            will be added to the FlowFile, with the value being the UTF-8 
Encoded value
+            of the Message's Key.
         </p>
         <p>
-               Kafka supports the notion of a Consumer Group when pulling 
messages in order to
-               provide scalability while still offering a publish-subscribe 
interface. Each
-               Consumer Group must have a unique identifier. The Consumer 
Group identifier that
-               is used by NiFi is the UUID of the Processor. This means that 
all of the nodes
-               within a cluster will use the same Consumer Group Identifier so 
that they do
-               not receive duplicate data but multiple GetKafka Processors can 
be used to pull
-               from multiple Topics, as each Processor will receive a 
different Processor UUID 
-               and therefore a different Consumer Group Identifier.
+            Kafka supports the notion of a Consumer Group when pulling 
messages in order to
+            provide scalability while still offering a publish-subscribe 
interface. Each
+            Consumer Group must have a unique identifier. The Consumer Group 
identifier that
+            is used by NiFi is the UUID of the Processor. This means that all 
of the nodes
+            within a cluster will use the same Consumer Group Identifier so 
that they do
+            not receive duplicate data but multiple GetKafka Processors can be 
used to pull
+            from multiple Topics, as each Processor will receive a different 
Processor UUID 
+            and therefore a different Consumer Group Identifier.
         </p>
     </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
index 04d9463..d51ce95 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html
@@ -24,22 +24,22 @@
         <!-- Processor Documentation 
================================================== -->
         <h2>Description:</h2>
         <p>
-               This Processors puts the contents of a FlowFile to a Topic in 
-               <a href="http://kafka.apache.org/";>Apache Kafka</a>. The full 
contents of
-               a FlowFile becomes the contents of a single message in Kafka.
-               This message is optionally assigned a key by using the
-               &lt;Kafka Key&gt; Property.
+            This Processors puts the contents of a FlowFile to a Topic in 
+            <a href="http://kafka.apache.org/";>Apache Kafka</a>. The full 
contents of
+            a FlowFile becomes the contents of a single message in Kafka.
+            This message is optionally assigned a key by using the
+            &lt;Kafka Key&gt; Property.
         </p>
 
-               <p>
-                       The Processor allows the user to configure an optional 
Message Delimiter that
-                       can be used to send many messages per FlowFile. For 
example, a \n could be used
-                       to indicate that the contents of the FlowFile should be 
used to send one message
-                       per line of text. If the property is not set, the 
entire contents of the FlowFile
-                       will be sent as a single message. When using the 
delimiter, if some messages are
-                       successfully sent but other messages fail to send, the 
FlowFile will be FORKed into
-                       two child FlowFiles, with the successfully sent 
messages being routed to 'success'
-                       and the messages that could not be sent going to 
'failure'.
-               </p>
+        <p>
+            The Processor allows the user to configure an optional Message 
Delimiter that
+            can be used to send many messages per FlowFile. For example, a \n 
could be used
+            to indicate that the contents of the FlowFile should be used to 
send one message
+            per line of text. If the property is not set, the entire contents 
of the FlowFile
+            will be sent as a single message. When using the delimiter, if 
some messages are
+            successfully sent but other messages fail to send, the FlowFile 
will be FORKed into
+            two child FlowFiles, with the successfully sent messages being 
routed to 'success'
+            and the messages that could not be sent going to 'failure'.
+        </p>
     </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index 10560f8..69ff48c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -37,14 +37,13 @@ import org.mockito.stubbing.Answer;
 
 public class TestGetKafka {
 
-       
     @BeforeClass
     public static void configureLogging() {
-       System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
+        System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka",
 "INFO");
         BasicConfigurator.configure();
     }
-    
+
     @Test
     @Ignore("Intended only for local tests to verify functionality.")
     public void testIntegrationLocally() {
@@ -53,24 +52,23 @@ public class TestGetKafka {
         runner.setProperty(GetKafka.TOPIC, "testX");
         runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
         runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
-        
+
         runner.run(20, false);
-        
+
         final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
-        for ( final MockFlowFile flowFile : flowFiles ) {
-               System.out.println(flowFile.getAttributes());
-               System.out.println(new String(flowFile.toByteArray()));
-               System.out.println();
+        for (final MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            System.out.println(new String(flowFile.toByteArray()));
+            System.out.println();
         }
     }
-    
-    
+
     @Test
     public void testWithDelimiter() {
         final List<String> messages = new ArrayList<>();
         messages.add("Hello");
         messages.add("Good-bye");
-        
+
         final TestableProcessor proc = new TestableProcessor(null, messages);
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, 
"localhost:2181");
@@ -79,20 +77,20 @@ public class TestGetKafka {
         runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
         runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
         runner.setProperty(GetKafka.BATCH_SIZE, "2");
-        
+
         runner.run();
-        
+
         runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
         final MockFlowFile mff = 
runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
         mff.assertContentEquals("Hello\nGood-bye");
     }
-    
+
     @Test
     public void testWithDelimiterAndNotEnoughMessages() {
         final List<String> messages = new ArrayList<>();
         messages.add("Hello");
         messages.add("Good-bye");
-        
+
         final TestableProcessor proc = new TestableProcessor(null, messages);
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, 
"localhost:2181");
@@ -101,40 +99,40 @@ public class TestGetKafka {
         runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
         runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
         runner.setProperty(GetKafka.BATCH_SIZE, "3");
-        
+
         runner.run();
-        
+
         runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
         final MockFlowFile mff = 
runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
         mff.assertContentEquals("Hello\nGood-bye");
     }
-    
-    
+
     private static class TestableProcessor extends GetKafka {
+
         private final byte[] key;
         private final Iterator<String> messageItr;
-        
+
         public TestableProcessor(final byte[] key, final List<String> 
messages) {
             this.key = key;
             messageItr = messages.iterator();
         }
-        
+
         @Override
         public void createConsumers(ProcessContext context) {
         }
-        
+
         @Override
         @SuppressWarnings({"unchecked", "rawtypes"})
         protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
             final ConsumerIterator<byte[], byte[]> itr = 
Mockito.mock(ConsumerIterator.class);
-            
+
             Mockito.doAnswer(new Answer<Boolean>() {
                 @Override
                 public Boolean answer(final InvocationOnMock invocation) 
throws Throwable {
                     return messageItr.hasNext();
                 }
             }).when(itr).hasNext();
-            
+
             Mockito.doAnswer(new Answer<MessageAndMetadata>() {
                 @Override
                 public MessageAndMetadata answer(InvocationOnMock invocation) 
throws Throwable {
@@ -142,21 +140,21 @@ public class TestGetKafka {
                     Mockito.when(mam.key()).thenReturn(key);
                     Mockito.when(mam.offset()).thenReturn(0L);
                     Mockito.when(mam.partition()).thenReturn(0);
-                    
+
                     Mockito.doAnswer(new Answer<byte[]>() {
                         @Override
                         public byte[] answer(InvocationOnMock invocation) 
throws Throwable {
                             return messageItr.next().getBytes();
                         }
-                        
+
                     }).when(mam).message();
-                    
+
                     return mam;
                 }
             }).when(itr).next();
-            
+
             return itr;
         }
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 56a5c4b..9500e29 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -36,13 +36,19 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.provenance.ProvenanceReporter;
-import org.apache.nifi.util.*;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockFlowFileQueue;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.MockProvenanceReporter;
+import org.apache.nifi.util.MockSessionFactory;
+import org.apache.nifi.util.SharedSessionState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 
-
 public class TestPutKafka {
 
     @Test
@@ -53,15 +59,15 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        
+
         runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
         runner.run();
-        
+
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-        
+
         final List<byte[]> messages = proc.getProducer().getMessages();
         assertEquals(11, messages.size());
-        
+
         assertTrue(Arrays.equals("Hello 
World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
         assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), 
messages.get(1)));
         assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), 
messages.get(2)));
@@ -74,8 +80,7 @@ public class TestPutKafka {
         assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), 
messages.get(9)));
         assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), 
messages.get(10)));
     }
-    
-    
+
     @Test
     public void testWithImmediateFailure() {
         final TestableProcessor proc = new TestableProcessor(0);
@@ -84,17 +89,16 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        
+
         final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
         runner.enqueue(text.getBytes());
         runner.run();
-        
+
         runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
         final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
         mff.assertContentEquals(text);
     }
-    
-    
+
     @Test
     public void testPartialFailure() {
         final TestableProcessor proc = new TestableProcessor(2);
@@ -104,22 +108,21 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
         runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
-        
+
         final byte[] bytes = "1\n2\n3\n4".getBytes();
         runner.enqueue(bytes);
         runner.run();
-        
+
         runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
         runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
 
         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
         successFF.assertContentEquals("1\n2\n");
-        
+
         final MockFlowFile failureFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
         failureFF.assertContentEquals("3\n4");
     }
-    
-    
+
     @Test
     public void testWithEmptyMessages() {
         final TestableProcessor proc = new TestableProcessor();
@@ -128,11 +131,11 @@ public class TestPutKafka {
         runner.setProperty(PutKafka.KEY, "key1");
         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        
+
         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
         runner.enqueue(bytes);
         runner.run();
-        
+
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
         final List<byte[]> msgs = proc.getProducer().getMessages();
@@ -144,7 +147,7 @@ public class TestPutKafka {
     }
 
     @Test
-    public void testProvenanceReporterMessagesCount(){
+    public void testProvenanceReporterMessagesCount() {
         final TestableProcessor processor = new TestableProcessor();
 
         ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
@@ -157,7 +160,6 @@ public class TestPutKafka {
         MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
         
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
 
-
         final TestRunner runner = TestRunners.newTestRunner(processor);
         Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
         Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
@@ -176,7 +178,7 @@ public class TestPutKafka {
     }
 
     @Test
-    public void testProvenanceReporterWithoutDelimiterMessagesCount(){
+    public void testProvenanceReporterWithoutDelimiterMessagesCount() {
         final TestableProcessor processor = new TestableProcessor();
 
         ProvenanceReporter spyProvenanceReporter = Mockito.spy(new 
MockProvenanceReporter());
@@ -189,7 +191,6 @@ public class TestPutKafka {
         MockProcessSession mockProcessSession = new 
MockProcessSession(sharedState);
         
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
 
-
         final TestRunner runner = TestRunners.newTestRunner(processor);
         Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
         Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
@@ -206,97 +207,97 @@ public class TestPutKafka {
         Mockito.verify(spyProvenanceReporter, 
Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1");
     }
 
-       @Test
-       @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
-       public void testKeyValuePut() {
-               final TestRunner runner = 
TestRunners.newTestRunner(PutKafka.class);
-               runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
-               runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
-               runner.setProperty(PutKafka.KEY, "${kafka.key}");
-               runner.setProperty(PutKafka.TIMEOUT, "3 secs");
-               runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
-               
-               final Map<String, String> attributes = new HashMap<>();
-               attributes.put("kafka.topic", "test");
-               attributes.put("kafka.key", "key3");
-               
-               final byte[] data = "Hello, World, Again! ;)".getBytes();
-               runner.enqueue(data, attributes);
-               runner.enqueue(data, attributes);
-               runner.enqueue(data, attributes);
-               runner.enqueue(data, attributes);
-               
-               runner.run(5);
-               
-               runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
-               final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
-               final MockFlowFile mff = mffs.get(0);
-               
-               assertTrue(Arrays.equals(data, mff.toByteArray()));
-       }
-       
-       
-       private static class TestableProcessor extends PutKafka {
-           private MockProducer producer;
-           private int failAfter = Integer.MAX_VALUE;
-           
-           public TestableProcessor() {
-           }
-           
-           public TestableProcessor(final int failAfter) {
-               this.failAfter = failAfter;
-           }
-           
-           @OnScheduled
-           public void instantiateProducer(final ProcessContext context) {
-               producer = new MockProducer(createConfig(context));
-               producer.setFailAfter(failAfter);
-           }
-           
-           @Override
-           protected Producer<byte[], byte[]> createProducer(final 
ProcessContext context) {
-               return producer;
-           }
-           
-           public MockProducer getProducer() {
-               return producer;
-           }
-       }
-       
-       
-       private static class MockProducer extends Producer<byte[], byte[]> {
-           private int sendCount = 0;
-           private int failAfter = Integer.MAX_VALUE;
-           
-           private final List<byte[]> messages = new ArrayList<>();
-           
+    @Test
+    @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
+    public void testKeyValuePut() {
+        final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
+        runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+        runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+        runner.setProperty(PutKafka.KEY, "${kafka.key}");
+        runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+        runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("kafka.topic", "test");
+        attributes.put("kafka.key", "key3");
+
+        final byte[] data = "Hello, World, Again! ;)".getBytes();
+        runner.enqueue(data, attributes);
+        runner.enqueue(data, attributes);
+        runner.enqueue(data, attributes);
+        runner.enqueue(data, attributes);
+
+        runner.run(5);
+
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+        final MockFlowFile mff = mffs.get(0);
+
+        assertTrue(Arrays.equals(data, mff.toByteArray()));
+    }
+
+    private static class TestableProcessor extends PutKafka {
+
+        private MockProducer producer;
+        private int failAfter = Integer.MAX_VALUE;
+
+        public TestableProcessor() {
+        }
+
+        public TestableProcessor(final int failAfter) {
+            this.failAfter = failAfter;
+        }
+
+        @OnScheduled
+        public void instantiateProducer(final ProcessContext context) {
+            producer = new MockProducer(createConfig(context));
+            producer.setFailAfter(failAfter);
+        }
+
+        @Override
+        protected Producer<byte[], byte[]> createProducer(final ProcessContext 
context) {
+            return producer;
+        }
+
+        public MockProducer getProducer() {
+            return producer;
+        }
+    }
+
+    private static class MockProducer extends Producer<byte[], byte[]> {
+
+        private int sendCount = 0;
+        private int failAfter = Integer.MAX_VALUE;
+
+        private final List<byte[]> messages = new ArrayList<>();
+
         public MockProducer(final ProducerConfig config) {
             super(config);
         }
-           
+
         @Override
         public void send(final KeyedMessage<byte[], byte[]> message) {
-            if ( ++sendCount > failAfter ) {
+            if (++sendCount > failAfter) {
                 throw new FailedToSendMessageException("Failed to send 
message", new RuntimeException("Unit test told to fail after " + failAfter + " 
successful messages"));
             } else {
                 messages.add(message.message());
             }
         }
-        
+
         public List<byte[]> getMessages() {
             return messages;
         }
-        
+
         @Override
         public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
-            for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
+            for (final KeyedMessage<byte[], byte[]> msg : messages) {
                 send(msg);
             }
         }
-        
+
         public void setFailAfter(final int successCount) {
             failAfter = successCount;
         }
-       }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8f2502c4/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index 0acb59e..74fef70 100644
--- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -26,12 +26,12 @@
         <module>nifi-kafka-nar</module>
     </modules>
     <dependencyManagement>
-       <dependencies>
-          <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-kafka-processors</artifactId>
-            <version>0.1.0-incubating-SNAPSHOT</version>
-          </dependency>
-       </dependencies>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-kafka-processors</artifactId>
+                <version>0.1.0-incubating-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
     </dependencyManagement> 
 </project>

Reply via email to