http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 0000000,51f9ef1..4df4e08
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@@ -1,0 -1,419 +1,419 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.kafka;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Pattern;
+ 
+ import kafka.javaapi.producer.Producer;
+ import kafka.producer.KeyedMessage;
+ import kafka.producer.ProducerConfig;
+ 
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.ByteArrayOutputStream;
+ import org.apache.nifi.stream.io.ByteCountingInputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+ import org.apache.nifi.util.LongHolder;
+ 
+ import scala.actors.threadpool.Arrays;
+ 
+ @SupportsBatching
+ @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+ @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka")
+ public class PutKafka extends AbstractProcessor {
+     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+     
+     public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed 
to failure unless the message is replicated to the appropriate number of Kafka 
Nodes according to the Topic configuration");
+     public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed 
to success if the message is received by a single Kafka node, whether or not it 
is replicated. This is faster than <Guarantee Replicated Delivery> but can 
result in data loss if a Kafka node crashes");
+     public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort", "FlowFile will be routed to success after 
successfully writing the content to a Kafka node, without waiting for a 
response. This provides the best performance but may result in data loss.");
+     
+     public static final PropertyDescriptor SEED_BROKERS = new 
PropertyDescriptor.Builder()
+         .name("Known Brokers")
+         .description("A comma-separated list of known Kafka Brokers in the 
format <host>:<port>")
+         .required(true)
+         
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+         .expressionLanguageSupported(false)
+         .build();
+     public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+           .name("Topic Name")
+           .description("The Kafka Topic of interest")
+           .required(true)
+           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+           .expressionLanguageSupported(true)
+           .build();
+     public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+               .name("Kafka Key")
+               .description("The Key to use for the Message")
+               .required(false)
+               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+               .expressionLanguageSupported(true)
+               .build();
+     public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+               .name("Delivery Guarantee")
+               .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
+               .required(true)
+               .expressionLanguageSupported(false)
+               .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
+               .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+               .build();
+     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+         .name("Message Delimiter")
+         .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
+                 + "If not specified, the entire content of the FlowFile will 
be used as a single message. "
+                 + "If specified, the contents of the FlowFile will be split 
on this delimiter and each section "
+                 + "sent as a separate Kafka message.")
+         .required(false)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(true)
+         .build();
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+         .name("Max Buffer Size")
+         .description("The maximum amount of data to buffer in memory before 
sending to Kafka")
+         .required(true)
+         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .defaultValue("1 MB")
+         .build();
+     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+           .name("Communications Timeout")
+           .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+           .required(true)
+           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+           .expressionLanguageSupported(false)
+           .defaultValue("30 secs")
+           .build();
+     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
+           .name("Client Name")
+           .description("Client Name to use when communicating with Kafka")
+           .required(true)
+           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+           .expressionLanguageSupported(false)
+           .build();
+ 
+     
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+           .name("success")
+           .description("Any FlowFile that is successfully sent to Kafka will 
be routed to this Relationship")
+           .build();
+     public static final Relationship REL_FAILURE = new Relationship.Builder()
+           .name("failure")
+           .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+           .build();
+ 
+     private final BlockingQueue<Producer<byte[], byte[]>> producers = new 
LinkedBlockingQueue<>();
+     
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+       final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
+               .fromPropertyDescriptor(CLIENT_NAME)
+               .defaultValue("NiFi-" + getIdentifier())
+               .build();
+       
+         final List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(SEED_BROKERS);
+         props.add(TOPIC);
+         props.add(KEY);
+         props.add(DELIVERY_GUARANTEE);
+         props.add(MESSAGE_DELIMITER);
+         props.add(MAX_BUFFER_SIZE);
+         props.add(TIMEOUT);
+         props.add(clientName);
+         return props;
+     }
+     
+     @Override
+     public Set<Relationship> getRelationships() {
+         final Set<Relationship> relationships = new HashSet<>(1);
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         return relationships;
+     }
+     
+     
+     @OnStopped
+     public void closeProducers() {
+       Producer<byte[], byte[]> producer;
+       
+       while ((producer = producers.poll()) != null) {
+               producer.close();
+       }
+     }
+     
+     protected ProducerConfig createConfig(final ProcessContext context) {
+         final String brokers = context.getProperty(SEED_BROKERS).getValue();
+ 
+         final Properties properties = new Properties();
+         properties.setProperty("metadata.broker.list", brokers);
+         properties.setProperty("request.required.acks", 
context.getProperty(DELIVERY_GUARANTEE).getValue());
+         properties.setProperty("client.id", 
context.getProperty(CLIENT_NAME).getValue());
+         properties.setProperty("request.timeout.ms", 
String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
+         
+         properties.setProperty("message.send.max.retries", "1");
+         properties.setProperty("producer.type", "sync");
+         
+         return new ProducerConfig(properties);
+     }
+     
+     protected Producer<byte[], byte[]> createProducer(final ProcessContext 
context) {
+       return new Producer<>(createConfig(context));
+     }
+     
+     private Producer<byte[], byte[]> borrowProducer(final ProcessContext 
context) {
+       Producer<byte[], byte[]> producer = producers.poll();
+       return producer == null ? createProducer(context) : producer;
+     }
+     
+     private void returnProducer(final Producer<byte[], byte[]> producer) {
+       producers.offer(producer);
+     }
+     
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+       FlowFile flowFile = session.get();
+       if ( flowFile == null ) {
+               return;
+       }
+       
+       final long start = System.nanoTime();
+         final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+         final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+         final byte[] keyBytes = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+         String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+         if ( delimiter != null ) {
+             delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
+         }
+         
+         final long maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
+         final Producer<byte[], byte[]> producer = borrowProducer(context);
+         
+         if ( delimiter == null ) {
+             // Send the entire FlowFile as a single message.
+             final byte[] value = new byte[(int) flowFile.getSize()];
+             session.read(flowFile, new InputStreamCallback() {
+                       @Override
+                       public void process(final InputStream in) throws 
IOException {
+                               StreamUtils.fillBuffer(in, value);
+                       }
+             });
+             
+             boolean error = false;
+             try {
+                 final KeyedMessage<byte[], byte[]> message;
+                 if ( key == null ) {
+                     message = new KeyedMessage<>(topic, value);
+                 } else {
+                     message = new KeyedMessage<>(topic, keyBytes, value);
+                 }
+                 
+                 producer.send(message);
+                 final long nanos = System.nanoTime() - start;
+                 
+                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic);
+                 session.transfer(flowFile, REL_SUCCESS);
+                 getLogger().info("Successfully sent {} to Kafka in {} 
millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+             } catch (final Exception e) {
+                 getLogger().error("Failed to send {} to Kafka due to {}; 
routing to failure", new Object[] {flowFile, e});
+                 session.transfer(flowFile, REL_FAILURE);
+                 error = true;
+             } finally {
+                 if ( error ) {
+                     producer.close();
+                 } else {
+                     returnProducer(producer);
+                 }
+             }
+         } else {
+             final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
+             
+             // The NonThreadSafeCircularBuffer allows us to add a byte from 
the stream one at a time and see
+             // if it matches some pattern. We can use this to search for the 
delimiter as we read through
+             // the stream of bytes in the FlowFile
+             final NonThreadSafeCircularBuffer buffer = new 
NonThreadSafeCircularBuffer(delimiterBytes);
+             
+             boolean error = false;
+             final LongHolder lastMessageOffset = new LongHolder(0L);
+             final LongHolder messagesSent = new LongHolder(0L);
+             
+             try (final ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
+                 session.read(flowFile, new InputStreamCallback() {
+                     @Override
+                     public void process(final InputStream rawIn) throws 
IOException {
+                         byte[] data = null; // contents of a single message
+                         
+                         boolean streamFinished = false;
+                         
+                         final List<KeyedMessage<byte[], byte[]>> messages = 
new ArrayList<>(); // batch to send
+                         long messageBytes = 0L; // size of messages in the 
'messages' list
+                         
+                         int nextByte;
+                         try (final InputStream bufferedIn = new 
BufferedInputStream(rawIn);
+                              final ByteCountingInputStream in = new 
ByteCountingInputStream(bufferedIn)) {
+                             
+                             // read until we're out of data.
+                             while (!streamFinished) {
+                                 nextByte = in.read();
+ 
+                                 if ( nextByte > -1 ) {
+                                     baos.write(nextByte);
+                                 }
+                                 
+                                 if (nextByte == -1) {
+                                     // we ran out of data. This message is 
complete.
+                                     data = baos.toByteArray();
+                                     streamFinished = true;
+                                 } else if ( buffer.addAndCompare((byte) 
nextByte) ) {
+                                     // we matched our delimiter. This message 
is complete. We want all of the bytes from the
+                                     // underlying BAOS exception for the last 
'delimiterBytes.length' bytes because we don't want
+                                     // the delimiter itself to be sent.
+                                     data = 
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - 
delimiterBytes.length);
+                                 }
+                                 
+                                 createMessage: if ( data != null ) {
+                                     // If the message has no data, ignore it.
+                                     if ( data.length == 0 ) {
+                                         data = null;
+                                         baos.reset();
+                                         break createMessage;
+                                     }
+                                     
+                                     // either we ran out of data or we 
reached the end of the message. 
+                                     // Either way, create the message because 
it's ready to send.
+                                     final KeyedMessage<byte[], byte[]> 
message;
+                                     if ( key == null ) {
+                                         message = new KeyedMessage<>(topic, 
data);
+                                     } else {
+                                         message = new KeyedMessage<>(topic, 
keyBytes, data);
+                                     }
+                                     
+                                     // Add the message to the list of 
messages ready to send. If we've reached our
+                                     // threshold of how many we're willing to 
send (or if we're out of data), go ahead
+                                     // and send the whole List.
+                                     messages.add(message);
+                                     messageBytes += data.length;
+                                     if ( messageBytes >= maxBufferSize || 
streamFinished ) {
+                                         // send the messages, then reset our 
state.
+                                         try {
+                                             producer.send(messages);
+                                         } catch (final Exception e) {
+                                             // we wrap the general exception 
in ProcessException because we want to separate
+                                             // failures in sending messages 
from general Exceptions that would indicate bugs
+                                             // in the Processor. Failure to 
send a message should be handled appropriately, but
+                                             // we don't want to catch the 
general Exception or RuntimeException in order to catch
+                                             // failures from Kafka's Producer.
+                                             throw new 
ProcessException("Failed to send messages to Kafka", e);
+                                         }
+                                         
+                                         
messagesSent.addAndGet(messages.size());    // count number of messages sent
+                                         
+                                         // reset state
+                                         messages.clear();
+                                         messageBytes = 0;
+                                         
+                                         // We've successfully sent a batch of 
messages. Keep track of the byte offset in the
+                                         // FlowFile of the last successfully 
sent message. This way, if the messages cannot
+                                         // all be successfully sent, we know 
where to split off the data. This allows us to then
+                                         // split off the first X number of 
bytes and send to 'success' and then split off the rest
+                                         // and send them to 'failure'.
+                                         
lastMessageOffset.set(in.getBytesConsumed());
+                                     }
+                                     
+                                     // reset BAOS so that we can start a new 
message.
+                                     baos.reset();
+                                     data = null;
+                                 }
+                             }
+ 
+                             // If there are messages left, send them
+                             if ( !messages.isEmpty() ) {
+                                 try {
+                                     producer.send(messages);
+                                 } catch (final Exception e) {
+                                     throw new ProcessException("Failed to 
send messages to Kafka", e);
+                                 }
+                             }
+                         }
+                     }
+                 });
+                 
+                 final long nanos = System.nanoTime() - start;
+                 
+                 session.getProvenanceReporter().send(flowFile, "kafka://" + 
topic, "Sent " + messagesSent.get() + " messages");
+                 session.transfer(flowFile, REL_SUCCESS);
+                 getLogger().info("Successfully sent {} messages to Kafka for 
{} in {} millis", new Object[] {messagesSent.get(), flowFile, 
TimeUnit.NANOSECONDS.toMillis(nanos)});
+             } catch (final ProcessException pe) {
+                 error = true;
+                 
+                 // There was a failure sending messages to Kafka. Iff the 
lastMessageOffset is 0, then all of them failed and we can
+                 // just route the FlowFile to failure. Otherwise, some 
messages were successful, so split them off and send them to
+                 // 'success' while we send the others to 'failure'.
+                 final long offset = lastMessageOffset.get();
+                 if ( offset == 0L ) {
+                     // all of the messages failed to send. Route FlowFile to 
failure
+                     getLogger().error("Failed to send {} to Kafka due to {}; 
routing to fialure", new Object[] {flowFile, pe.getCause()});
+                     session.transfer(flowFile, REL_FAILURE);
+                 } else {
+                     // Some of the messages were sent successfully. We want 
to split off the successful messages from the failed messages.
+                     final FlowFile successfulMessages = 
session.clone(flowFile, 0L, offset);
+                     final FlowFile failedMessages = session.clone(flowFile, 
offset, flowFile.getSize() - offset);
+                     
+                     getLogger().error("Successfully sent {} of the messages 
from {} but then failed to send the rest. Original FlowFile split into two: {} 
routed to 'success', {} routed to 'failure'. Failure was due to {}", new 
Object[] {
+                          messagesSent.get(), flowFile, successfulMessages, 
failedMessages, pe.getCause() });
+                     
+                     session.transfer(successfulMessages, REL_SUCCESS);
+                     session.transfer(failedMessages, REL_FAILURE);
+                     session.remove(flowFile);
+                     session.getProvenanceReporter().send(successfulMessages, 
"kafka://" + topic);
+                 }
+             } finally {
+                 if ( error ) {
+                     producer.close();
+                 } else {
+                     returnProducer(producer);
+                 }
+             }
+             
+         }
+     }
+     
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 0000000,06a4604..b0f2394
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@@ -1,0 -1,236 +1,236 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.kafka;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertTrue;
+ 
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ 
+ import kafka.common.FailedToSendMessageException;
+ import kafka.javaapi.producer.Producer;
+ import kafka.producer.KeyedMessage;
+ import kafka.producer.ProducerConfig;
+ 
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.processor.ProcessContext;
 -import org.apache.nifi.processor.annotation.OnScheduled;
+ import org.apache.nifi.util.MockFlowFile;
+ import org.apache.nifi.util.TestRunner;
+ import org.apache.nifi.util.TestRunners;
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ 
+ public class TestPutKafka {
+ 
+     @Test
+     public void testMultipleKeyValuePerFlowFile() {
+         final TestableProcessor proc = new TestableProcessor();
+         final TestRunner runner = TestRunners.newTestRunner(proc);
+         runner.setProperty(PutKafka.TOPIC, "topic1");
+         runner.setProperty(PutKafka.KEY, "key1");
+         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+         
+         runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
+         runner.run();
+         
+         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+         
+         final List<byte[]> messages = proc.getProducer().getMessages();
+         assertEquals(11, messages.size());
+         
+         assertTrue(Arrays.equals("Hello 
World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
+         assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), 
messages.get(1)));
+         assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), 
messages.get(2)));
+         assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), 
messages.get(3)));
+         assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), 
messages.get(4)));
+         assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), 
messages.get(5)));
+         assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), 
messages.get(6)));
+         assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), 
messages.get(7)));
+         assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), 
messages.get(8)));
+         assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), 
messages.get(9)));
+         assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), 
messages.get(10)));
+     }
+     
+     
+     @Test
+     public void testWithImmediateFailure() {
+         final TestableProcessor proc = new TestableProcessor(0);
+         final TestRunner runner = TestRunners.newTestRunner(proc);
+         runner.setProperty(PutKafka.TOPIC, "topic1");
+         runner.setProperty(PutKafka.KEY, "key1");
+         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+         
+         final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
+         runner.enqueue(text.getBytes());
+         runner.run();
+         
+         runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
+         final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+         mff.assertContentEquals(text);
+     }
+     
+     
+     @Test
+     public void testPartialFailure() {
+         final TestableProcessor proc = new TestableProcessor(2);
+         final TestRunner runner = TestRunners.newTestRunner(proc);
+         runner.setProperty(PutKafka.TOPIC, "topic1");
+         runner.setProperty(PutKafka.KEY, "key1");
+         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+         runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+         
+         final byte[] bytes = "1\n2\n3\n4".getBytes();
+         runner.enqueue(bytes);
+         runner.run();
+         
+         runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
+         runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+ 
+         final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+         successFF.assertContentEquals("1\n2\n");
+         
+         final MockFlowFile failureFF = 
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+         failureFF.assertContentEquals("3\n4");
+     }
+     
+     
+     @Test
+     public void testWithEmptyMessages() {
+         final TestableProcessor proc = new TestableProcessor();
+         final TestRunner runner = TestRunners.newTestRunner(proc);
+         runner.setProperty(PutKafka.TOPIC, "topic1");
+         runner.setProperty(PutKafka.KEY, "key1");
+         runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+         runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+         
+         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+         runner.enqueue(bytes);
+         runner.run();
+         
+         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+ 
+         final List<byte[]> msgs = proc.getProducer().getMessages();
+         assertEquals(4, msgs.size());
+         assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
+         assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
+         assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
+         assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
+     }
+     
+     
+       @Test
+       @Ignore("Intended only for local testing; requires an actual running 
instance of Kafka & ZooKeeper...")
+       public void testKeyValuePut() {
+               final TestRunner runner = 
TestRunners.newTestRunner(PutKafka.class);
+               runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+               runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+               runner.setProperty(PutKafka.KEY, "${kafka.key}");
+               runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+               runner.setProperty(PutKafka.DELIVERY_GUARANTEE, 
PutKafka.DELIVERY_REPLICATED.getValue());
+               
+               final Map<String, String> attributes = new HashMap<>();
+               attributes.put("kafka.topic", "test");
+               attributes.put("kafka.key", "key3");
+               
+               final byte[] data = "Hello, World, Again! ;)".getBytes();
+               runner.enqueue(data, attributes);
+               runner.enqueue(data, attributes);
+               runner.enqueue(data, attributes);
+               runner.enqueue(data, attributes);
+               
+               runner.run(5);
+               
+               runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
+               final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+               final MockFlowFile mff = mffs.get(0);
+               
+               assertTrue(Arrays.equals(data, mff.toByteArray()));
+       }
+       
+       
+       private static class TestableProcessor extends PutKafka {
+           private MockProducer producer;
+           private int failAfter = Integer.MAX_VALUE;
+           
+           public TestableProcessor() {
+           }
+           
+           public TestableProcessor(final int failAfter) {
+               this.failAfter = failAfter;
+           }
+           
+           @OnScheduled
+           public void instantiateProducer(final ProcessContext context) {
+               producer = new MockProducer(createConfig(context));
+               producer.setFailAfter(failAfter);
+           }
+           
+           @Override
+           protected Producer<byte[], byte[]> createProducer(final 
ProcessContext context) {
+               return producer;
+           }
+           
+           public MockProducer getProducer() {
+               return producer;
+           }
+       }
+       
+       
+       private static class MockProducer extends Producer<byte[], byte[]> {
+           private int sendCount = 0;
+           private int failAfter = Integer.MAX_VALUE;
+           
+           private final List<byte[]> messages = new ArrayList<>();
+           
+         public MockProducer(final ProducerConfig config) {
+             super(config);
+         }
+           
+         @Override
+         public void send(final KeyedMessage<byte[], byte[]> message) {
+             if ( ++sendCount > failAfter ) {
+                 throw new FailedToSendMessageException("Failed to send 
message", new RuntimeException("Unit test told to fail after " + failAfter + " 
successful messages"));
+             } else {
+                 messages.add(message.message());
+             }
+         }
+         
+         public List<byte[]> getMessages() {
+             return messages;
+         }
+         
+         @Override
+         public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
+             for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
+                 send(msg);
+             }
+         }
+         
+         public void setFailAfter(final int successCount) {
+             failAfter = successCount;
+         }
+       }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 0000000,521bd94..d9175e0
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@@ -1,0 -1,142 +1,145 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
 -import org.apache.nifi.processor.ProcessContext;
 -import org.apache.nifi.processor.AbstractProcessor;
 -import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.Relationship;
++import java.io.IOException;
++import java.io.InputStream;
++import java.io.OutputStream;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Set;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.commons.codec.binary.Base64InputStream;
++import org.apache.commons.codec.binary.Base64OutputStream;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.processor.AbstractProcessor;
++import org.apache.nifi.processor.ProcessContext;
++import org.apache.nifi.processor.ProcessSession;
++import org.apache.nifi.processor.ProcessorInitializationContext;
++import org.apache.nifi.processor.Relationship;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
+ import org.apache.nifi.util.StopWatch;
+ 
 -import org.apache.commons.codec.binary.Base64InputStream;
 -import org.apache.commons.codec.binary.Base64OutputStream;
 -
 -import java.io.IOException;
 -import java.io.InputStream;
 -import java.io.OutputStream;
 -import java.util.*;
 -import java.util.concurrent.TimeUnit;
 -
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"experimental", "encode", "base64"})
+ @CapabilityDescription("Encodes the FlowFile content in base64")
+ public class Base64EncodeContent extends AbstractProcessor {
+ 
+     public static final String ENCODE_MODE = "Encode";
+     public static final String DECODE_MODE = "Decode";
+ 
+     public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+             .name("Mode")
+             .description("Specifies whether the content should be encoded or 
decoded")
+             .required(true)
+             .allowableValues(ENCODE_MODE, DECODE_MODE)
+             .defaultValue(ENCODE_MODE)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("Any FlowFile that is 
successfully encoded or decoded will be routed to success").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Any FlowFile that cannot be 
encoded or decoded will be routed to failure").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(MODE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+ 
+         boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+         try {
+             final StopWatch stopWatch = new StopWatch(true);
+             if (encode) {
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(InputStream in, OutputStream out) 
throws IOException {
+                         try (Base64OutputStream bos = new 
Base64OutputStream(out)) {
+                             int len = -1;
+                             byte[] buf = new byte[8192];
+                             while ((len = in.read(buf)) > 0) {
+                                 bos.write(buf, 0, len);
+                             }
+                             bos.flush();
+                         }
+                     }
+                 });
+             } else {
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(InputStream in, OutputStream out) 
throws IOException {
+                         try (Base64InputStream bis = new 
Base64InputStream(new ValidatingBase64InputStream(in))) {
+                             int len = -1;
+                             byte[] buf = new byte[8192];
+                             while ((len = bis.read(buf)) > 0) {
+                                 out.write(buf, 0, len);
+                             }
+                             out.flush();
+                         }
+                     }
+                 });
+             }
+ 
+             logger.info("Successfully {} {}", new Object[]{encode ? "encoded" 
: "decoded", flowFile});
+             session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (Exception e) {
+             logger.error("Failed to {} {} due to {}", new Object[]{encode ? 
"encode" : "decode", flowFile, e});
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index 0000000,21dfe93..cf20f16
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@@ -1,0 -1,307 +1,307 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.Closeable;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ import lzma.sdk.lzma.Decoder;
+ import lzma.streams.LzmaInputStream;
+ import lzma.streams.LzmaOutputStream;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.GZIPOutputStream;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.apache.commons.compress.compressors.CompressorStreamFactory;
+ import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+ import org.tukaani.xz.LZMA2Options;
+ import org.tukaani.xz.XZInputStream;
+ import org.tukaani.xz.XZOutputStream;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", 
"xz-lzma2"})
+ @CapabilityDescription("Compresses or decompresses the contents of FlowFiles 
using a user-specified compression algorithm and updates the mime.type 
attribute as appropriate")
+ public class CompressContent extends AbstractProcessor {
+ 
+     public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type 
attribute";
+     public static final String COMPRESSION_FORMAT_GZIP = "gzip";
+     public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
+     public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
+     public static final String COMPRESSION_FORMAT_LZMA = "lzma";
+ 
+     public static final String MODE_COMPRESS = "compress";
+     public static final String MODE_DECOMPRESS = "decompress";
+ 
+     public static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
+             .name("Compression Format")
+             .description("The compression format to use. Valid values are: 
GZIP, BZIP2, XZ-LZMA2, and LZMA")
+             .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, 
COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, 
COMPRESSION_FORMAT_LZMA)
+             .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor COMPRESSION_LEVEL = new 
PropertyDescriptor.Builder()
+             .name("Compression Level")
+             .description("The compression level to use; this is valid only 
when using GZIP compression. A lower value results in faster processing but 
less compression; a value of 0 indicates no compression but simply archiving")
+             .defaultValue("1")
+             .required(true)
+             .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+             .build();
+     public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+             .name("Mode")
+             .description("Indicates whether the processor should compress 
content or decompress content. Must be either 'compress' or 'decompress'")
+             .allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
+             .defaultValue(MODE_COMPRESS)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor UPDATE_FILENAME = new 
PropertyDescriptor.Builder()
+             .name("Update Filename")
+             .description("If true, will remove the filename extension when 
decompressing data (only if the extension indicates the appropriate compression 
format) and add the appropriate extension when compressing data")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("FlowFiles will be 
transferred to the success relationship after successfully being compressed or 
decompressed").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("FlowFiles will be 
transferred to the failure relationship if they fail to 
compress/decompress").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+     private Map<String, String> compressionFormatMimeTypeMap;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(MODE);
+         properties.add(COMPRESSION_FORMAT);
+         properties.add(COMPRESSION_LEVEL);
+         properties.add(UPDATE_FILENAME);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final Map<String, String> mimeTypeMap = new HashMap<>();
+         mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP);
+         mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
+         mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
+         this.compressionFormatMimeTypeMap = 
Collections.unmodifiableMap(mimeTypeMap);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final long sizeBeforeCompression = flowFile.getSize();
+         final String compressionMode = context.getProperty(MODE).getValue();
+ 
+         String compressionFormatValue = 
context.getProperty(COMPRESSION_FORMAT).getValue();
+         if (compressionFormatValue.equals(COMPRESSION_FORMAT_ATTRIBUTE)) {
+             final String mimeType = 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+             if (mimeType == null) {
+                 logger.error("No {} attribute exists for {}; routing to 
failure", new Object[]{CoreAttributes.MIME_TYPE.key(), flowFile});
+                 session.transfer(flowFile, REL_FAILURE);
+                 return;
+             }
+ 
+             compressionFormatValue = 
compressionFormatMimeTypeMap.get(mimeType);
+             if (compressionFormatValue == null) {
+                 logger.info("Mime Type of {} is '{}', which does not indicate 
a supported Compression Format; routing to success without decompressing",
+                         new Object[]{flowFile, mimeType});
+                 session.transfer(flowFile, REL_SUCCESS);
+                 return;
+             }
+         }
+ 
+         final String compressionFormat = compressionFormatValue;
+         final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null);
+         final StopWatch stopWatch = new StopWatch(true);
+ 
+         final String fileExtension;
+         switch (compressionFormat.toLowerCase()) {
+             case COMPRESSION_FORMAT_GZIP:
+                 fileExtension = ".gz";
+                 break;
+             case COMPRESSION_FORMAT_LZMA:
+                 fileExtension = ".lzma";
+                 break;
+             case COMPRESSION_FORMAT_XZ_LZMA2:
+                 fileExtension = ".xz";
+                 break;
+             case COMPRESSION_FORMAT_BZIP2:
+                 fileExtension = ".bz2";
+                 break;
+             default:
+                 fileExtension = "";
+                 break;
+         }
+ 
+         try {
+             flowFile = session.write(flowFile, new StreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
+                     final OutputStream compressionOut;
+                     final InputStream compressionIn;
+ 
+                     final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut, 65536);
+                     final InputStream bufferedIn = new 
BufferedInputStream(rawIn, 65536);
+ 
+                     try {
+                         if (MODE_COMPRESS.equalsIgnoreCase(compressionMode)) {
+                             compressionIn = bufferedIn;
+ 
+                             switch (compressionFormat.toLowerCase()) {
+                                 case COMPRESSION_FORMAT_GZIP:
+                                     final int compressionLevel = 
context.getProperty(COMPRESSION_LEVEL).asInteger();
+                                     compressionOut = new 
GZIPOutputStream(bufferedOut, compressionLevel);
+                                     mimeTypeRef.set("application/gzip");
+                                     break;
+                                 case COMPRESSION_FORMAT_LZMA:
+                                     compressionOut = new 
LzmaOutputStream.Builder(bufferedOut).build();
+                                     mimeTypeRef.set("application/x-lzma");
+                                     break;
+                                 case COMPRESSION_FORMAT_XZ_LZMA2:
+                                     compressionOut = new 
XZOutputStream(bufferedOut, new LZMA2Options());
+                                     mimeTypeRef.set("application/x-xz");
+                                     break;
+                                 case COMPRESSION_FORMAT_BZIP2:
+                                 default:
+                                     mimeTypeRef.set("application/bzip2");
+                                     compressionOut = new 
CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(),
 bufferedOut);
+                                     break;
+                             }
+                         } else {
+                             compressionOut = bufferedOut;
+                             switch (compressionFormat.toLowerCase()) {
+                                 case COMPRESSION_FORMAT_LZMA:
+                                     compressionIn = new 
LzmaInputStream(bufferedIn, new Decoder());
+                                     break;
+                                 case COMPRESSION_FORMAT_XZ_LZMA2:
+                                     compressionIn = new 
XZInputStream(bufferedIn);
+                                     break;
+                                 case COMPRESSION_FORMAT_BZIP2:
+                                     // need this two-arg constructor to 
support concatenated streams
+                                     compressionIn = new 
BZip2CompressorInputStream(bufferedIn, true);
+                                     break;
+                                 case COMPRESSION_FORMAT_GZIP:
+                                     compressionIn = new 
GzipCompressorInputStream(bufferedIn, true);
+                                     break;
+                                 default:
+                                     compressionIn = new 
CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(),
 bufferedIn);
+                             }
+                         }
+                     } catch (final Exception e) {
+                         closeQuietly(bufferedOut);
+                         throw new IOException(e);
+                     }
+ 
+                     try (final InputStream in = compressionIn;
+                             final OutputStream out = compressionOut) {
+                         final byte[] buffer = new byte[8192];
+                         int len;
+                         while ((len = in.read(buffer)) > 0) {
+                             out.write(buffer, 0, len);
+                         }
+                         out.flush();
+                     }
+                 }
+             });
+             stopWatch.stop();
+ 
+             final long sizeAfterCompression = flowFile.getSize();
+             if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) {
+                 flowFile = session.removeAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key());
+ 
+                 if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
+                     final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                     if (filename.toLowerCase().endsWith(fileExtension)) {
+                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), filename.substring(0, filename.length() - 
fileExtension.length()));
+                     }
+                 }
+             } else {
+                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
+ 
+                 if (context.getProperty(UPDATE_FILENAME).asBoolean()) {
+                     final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                     flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), filename + fileExtension);
+                 }
+             }
+ 
+             logger.info("Successfully {}ed {} using {} compression format; 
size changed from {} to {} bytes", new Object[]{
+                 compressionMode.toLowerCase(), flowFile, compressionFormat, 
sizeBeforeCompression, sizeAfterCompression});
+             session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getDuration(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (final Exception e) {
+             logger.error("Unable to {} {} using {} compression format due to 
{}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, 
compressionFormat, e});
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ 
+     private void closeQuietly(final Closeable closeable) {
+         if (closeable != null) {
+             try {
+                 closeable.close();
+             } catch (final Exception e) {
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index 0000000,a965442..2c1a864
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@@ -1,0 -1,381 +1,381 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerSerially;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerSerially;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.timebuffer.EntityAccess;
+ import org.apache.nifi.util.timebuffer.TimedBuffer;
+ 
+ @SideEffectFree
+ @TriggerSerially
+ @Tags({"rate control", "throttle", "rate", "throughput"})
+ @CapabilityDescription("Controls the rate at which data is transferred to 
follow-on processors.")
+ public class ControlRate extends AbstractProcessor {
+ 
+     public static final String DATA_RATE = "data rate";
+     public static final String FLOWFILE_RATE = "flowfile count";
+     public static final String ATTRIBUTE_RATE = "attribute value";
+ 
+     public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new 
PropertyDescriptor.Builder()
+             .name("Rate Control Criteria")
+             .description("Indicates the criteria that is used to control the 
throughput rate. Changing this value resets the rate counters.")
+             .required(true)
+             .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+             .defaultValue(DATA_RATE)
+             .build();
+     public static final PropertyDescriptor MAX_RATE = new 
PropertyDescriptor.Builder()
+             .name("Maximum Rate")
+             .description("The maximum rate at which data should pass through 
this processor. The format of this property is expected to be a positive 
integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 
'data rate'.")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // 
validated in customValidate b/c dependent on Rate Control Criteria
+             .build();
+     public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+             .name("Rate Controlled Attribute")
+             .description("The name of an attribute whose values build toward 
the rate limit if Rate Control Criteria is set to 'attribute value'. The value 
of the attribute referenced by this property must be a positive integer, or the 
FlowFile will be routed to failure. This value is ignored if Rate Control 
Criteria is not set to 'attribute value'. Changing this value resets the rate 
counters.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(false)
+             .build();
+     public static final PropertyDescriptor TIME_PERIOD = new 
PropertyDescriptor.Builder()
+             .name("Time Duration")
+             .description("The amount of time to which the Maximum Data Size 
and Maximum Number of Files pertains. Changing this value resets the rate 
counters.")
+             .required(true)
+             .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+             .defaultValue("1 min")
+             .build();
+     public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+             .name("Grouping Attribute")
+             .description("By default, a single \"throttle\" is used for all 
FlowFiles. If this value is specified, a separate throttle is used for each 
value specified by the attribute with this name. Changing this value resets the 
rate counters.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(false)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All FlowFiles are 
transferred to this relationship").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("FlowFiles will be routed to 
this relationship if they are missing a necessary attribute or the attribute is 
not in the expected format").build();
+ 
+     private static final Pattern POSITIVE_LONG_PATTERN = 
Pattern.compile("0*[1-9][0-9]*");
+     private static final String DEFAULT_GROUP_ATTRIBUTE = 
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+ 
+     private final ConcurrentMap<String, Throttle> throttleMap = new 
ConcurrentHashMap<>();
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+     private final AtomicLong lastThrottleClearTime = new 
AtomicLong(System.currentTimeMillis());
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(RATE_CONTROL_CRITERIA);
+         properties.add(MAX_RATE);
+         properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+         properties.add(TIME_PERIOD);
+         properties.add(GROUPING_ATTRIBUTE_NAME);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+         final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(context));
+ 
+         final Validator rateValidator;
+         switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+             case DATA_RATE:
+                 rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+                 break;
+             case ATTRIBUTE_RATE:
+                 rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                 final String rateAttr = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+                 if (rateAttr == null) {
+                     validationResults.add(new 
ValidationResult.Builder().subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).explanation("<Rate
 Controlled Attribute> property must be set if using <Rate Control Criteria> of 
'attribute value'").build());
+                 }
+                 break;
+             case FLOWFILE_RATE:
+             default:
+                 rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                 break;
+         }
+ 
+         final ValidationResult rateResult = rateValidator.validate("Maximum 
Rate", context.getProperty(MAX_RATE).getValue(), null);
+         if (!rateResult.isValid()) {
+             validationResults.add(rateResult);
+         }
+ 
+         return validationResults;
+     }
+ 
+     @Override
+     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+         super.onPropertyModified(descriptor, oldValue, newValue);
+ 
+         if (descriptor.equals(RATE_CONTROL_CRITERIA) || 
descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || 
descriptor.equals(GROUPING_ATTRIBUTE_NAME) || descriptor.equals(TIME_PERIOD)) {
+             // if the criteria that is being used to determine 
limits/throttles is changed, we must clear our throttle map.
+             throttleMap.clear();
+         } else if (descriptor.equals(MAX_RATE)) {
+             final long newRate;
+             if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+                 newRate = DataUnit.parseDataSize(newValue, 
DataUnit.B).longValue();
+             } else {
+                 newRate = Long.parseLong(newValue);
+             }
+ 
+             for (final Throttle throttle : throttleMap.values()) {
+                 throttle.setMaxRate(newRate);
+             }
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         final long lastClearTime = lastThrottleClearTime.get();
+         final long throttleExpirationMillis = System.currentTimeMillis() - 2 
* context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+         if (lastClearTime < throttleExpirationMillis) {
+             if (lastThrottleClearTime.compareAndSet(lastClearTime, 
System.currentTimeMillis())) {
+                 final Iterator<Map.Entry<String, Throttle>> itr = 
throttleMap.entrySet().iterator();
+                 while (itr.hasNext()) {
+                     final Map.Entry<String, Throttle> entry = itr.next();
+                     final Throttle throttle = entry.getValue();
+                     if (throttle.tryLock()) {
+                         try {
+                             if (throttle.lastUpdateTime() < lastClearTime) {
+                                 itr.remove();
+                             }
+                         } finally {
+                             throttle.unlock();
+                         }
+                     }
+                 }
+             }
+         }
+ 
+         // TODO: Should periodically clear any Throttle that has not been 
used in more than 2 throttling periods
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final long seconds = 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+         final String rateControlAttributeName = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+         long rateValue;
+         switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+             case DATA_RATE:
+                 rateValue = flowFile.getSize();
+                 break;
+             case FLOWFILE_RATE:
+                 rateValue = 1;
+                 break;
+             case ATTRIBUTE_RATE:
+                 final String attributeValue = 
flowFile.getAttribute(rateControlAttributeName);
+                 if (attributeValue == null) {
+                     logger.error("routing {} to 'failure' because FlowFile is 
missing required attribute {}",
+                             new Object[]{flowFile, rateControlAttributeName});
+                     session.transfer(flowFile, REL_FAILURE);
+                     return;
+                 }
+ 
+                 if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) 
{
+                     logger.error("routing {} to 'failure' because FlowFile 
attribute {} has a value of {}, which is not a positive integer",
+                             new Object[]{flowFile, rateControlAttributeName, 
attributeValue});
+                     session.transfer(flowFile, REL_FAILURE);
+                     return;
+                 }
+                 rateValue = Long.parseLong(attributeValue);
+                 break;
+             default:
+                 throw new AssertionError("<Rate Control Criteria> property 
set to illegal value of " + 
context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+         }
+ 
+         final String groupingAttributeName = 
context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+         final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+         Throttle throttle = throttleMap.get(groupName);
+         if (throttle == null) {
+             throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+ 
+             final String maxRateValue = 
context.getProperty(MAX_RATE).getValue();
+             final long newRate;
+             if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+                 newRate = DataUnit.parseDataSize(maxRateValue, 
DataUnit.B).longValue();
+             } else {
+                 newRate = Long.parseLong(maxRateValue);
+             }
+             throttle.setMaxRate(newRate);
+ 
+             throttleMap.put(groupName, throttle);
+         }
+ 
+         throttle.lock();
+         try {
+             if (throttle.tryAdd(rateValue)) {
+                 logger.info("transferring {} to 'success'", new 
Object[]{flowFile});
+                 session.transfer(flowFile, REL_SUCCESS);
+             } else {
+                 flowFile = session.penalize(flowFile);
+                 session.transfer(flowFile);
+             }
+         } finally {
+             throttle.unlock();
+         }
+     }
+ 
+     private static class TimestampedLong {
+ 
+         private final Long value;
+         private final long timestamp = System.currentTimeMillis();
+ 
+         public TimestampedLong(final Long value) {
+             this.value = value;
+         }
+ 
+         public Long getValue() {
+             return value;
+         }
+ 
+         public long getTimestamp() {
+             return timestamp;
+         }
+     }
+ 
+     private static class RateEntityAccess implements 
EntityAccess<TimestampedLong> {
+ 
+         @Override
+         public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
+             if (oldValue == null && toAdd == null) {
+                 return new TimestampedLong(0L);
+             } else if (oldValue == null) {
+                 return toAdd;
+             } else if (toAdd == null) {
+                 return oldValue;
+             }
+ 
+             return new TimestampedLong(oldValue.getValue() + 
toAdd.getValue());
+         }
+ 
+         @Override
+         public TimestampedLong createNew() {
+             return new TimestampedLong(0L);
+         }
+ 
+         @Override
+         public long getTimestamp(TimestampedLong entity) {
+             return entity == null ? 0L : entity.getTimestamp();
+         }
+     }
+ 
+     private static class Throttle extends ReentrantLock {
+ 
+         private final AtomicLong maxRate = new AtomicLong(1L);
+         private final long timePeriodValue;
+         private final TimeUnit timePeriodUnit;
+         private final TimedBuffer<TimestampedLong> timedBuffer;
+         private final ProcessorLog logger;
+ 
+         private volatile long penalizationExpired;
+         private volatile long lastUpdateTime;
+ 
+         public Throttle(final int timePeriod, final TimeUnit unit, final 
ProcessorLog logger) {
+             this.timePeriodUnit = unit;
+             this.timePeriodValue = timePeriod;
+             this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new 
RateEntityAccess());
+             this.logger = logger;
+         }
+ 
+         public void setMaxRate(final long maxRate) {
+             this.maxRate.set(maxRate);
+         }
+ 
+         public long lastUpdateTime() {
+             return lastUpdateTime;
+         }
+ 
+         public boolean tryAdd(final long value) {
+             final long now = System.currentTimeMillis();
+             if (penalizationExpired > now) {
+                 return false;
+             }
+ 
+             final long maxRateValue = maxRate.get();
+ 
+             final TimestampedLong sum = 
timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, 
timePeriodUnit));
+             if (sum != null && sum.getValue() >= maxRateValue) {
+                 logger.debug("current sum for throttle is {}, so not allowing 
rate of {} through", new Object[]{sum.getValue(), value});
+                 return false;
+             }
+ 
+             logger.debug("current sum for throttle is {}, so allowing rate of 
{} through", new Object[]{sum == null ? 0 : sum.getValue(), value});
+ 
+             final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
+             if (transferred > maxRateValue) {
+                 final long amountOver = transferred - maxRateValue;
+                 // determine how long it should take to transfer 'amountOver' 
and 'penalize' the Throttle for that long
+                 final long milliDuration = 
TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+                 final double pct = (double) amountOver / (double) 
maxRateValue;
+                 final long penalizationPeriod = (long) (milliDuration * pct);
+                 this.penalizationExpired = now + penalizationPeriod;
+                 logger.debug("allowing rate of {} through but penalizing 
Throttle for {} milliseconds", new Object[]{value, penalizationPeriod});
+             }
+ 
+             lastUpdateTime = now;
+             return true;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
index 0000000,d67179f..119a3f2
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java
@@@ -1,0 -1,175 +1,175 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import java.io.*;
+ import java.nio.CharBuffer;
+ import java.nio.charset.Charset;
+ import java.nio.charset.CharsetDecoder;
+ import java.nio.charset.CharsetEncoder;
+ import java.nio.charset.CodingErrorAction;
+ import java.util.*;
+ import java.util.concurrent.TimeUnit;
+ 
+ /**
+  * <p>
+  * This processor reads files in as text according to the specified character
+  * set and it outputs another text file according to the given characeter set.
+  * The character sets supported depend on the version of the JRE and is 
platform
+  * specific. In addition, the JVM can be expanded with additional character 
sets
+  * to support. More information on which character sets are supported can be
+  * found in the JDK documentation under the docs directory in the following
+  * path: ....\technotes\guides\intl\encoding.doc.html</p>
+  *
+  * <p>
+  * The conversion process is very passive. For conversions that do not map
+  * perfectly the conversion will replace unmappable or unrecognized input 
using
+  * the '?' character.
+  *
+  * <p>
+  * The following properties are required: <ul> <li><b>input.charset</b> - The
+  * character set of the original file contents</li> <li><b>output.charset</b> 
-
+  * The character set of the resulting file</li> </ul> </p>
+  *
+  * <p>
+  * The following properties are optional: <ul> <li><b>N/A</b> - </li> </ul>
+  * </p>
+  *
+  * <p>
+  * The following relationships are required: <ul> <li><b>success</b> - the id 
of
+  * the processor to transfer successfully converted files</li>
+  * <li><b>failure</b> - the id of the processor to transfer unsuccessfully
+  * converted files</li> </ul> </p>
+  */
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"text", "convert", "characterset", "character set"})
+ @CapabilityDescription("Converts a FlowFile's content from one character set 
to another")
+ public class ConvertCharacterSet extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor INPUT_CHARSET = new 
PropertyDescriptor.Builder()
+             .name("Input Character Set")
+             .description("The name of the CharacterSet to expect for Input")
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor OUTPUT_CHARSET = new 
PropertyDescriptor.Builder()
+             .name("Output Character Set")
+             .description("The name of the CharacterSet to convert to")
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .required(true)
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("").build();
+ 
+     public static final int MAX_BUFFER_SIZE = 512 * 1024;
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(INPUT_CHARSET);
+         properties.add(OUTPUT_CHARSET);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final ProcessorLog logger = getLogger();
+ 
+         final Charset inputCharset = 
Charset.forName(context.getProperty(INPUT_CHARSET).getValue());
+         final Charset outputCharset = 
Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue());
+         final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE);
+ 
+         final CharsetDecoder decoder = inputCharset.newDecoder();
+         decoder.onMalformedInput(CodingErrorAction.REPLACE);
+         decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+         decoder.replaceWith("?");
+ 
+         final CharsetEncoder encoder = outputCharset.newEncoder();
+         encoder.onMalformedInput(CodingErrorAction.REPLACE);
+         encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+         encoder.replaceWith("?".getBytes(outputCharset));
+ 
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         try {
+             final StopWatch stopWatch = new StopWatch(true);
+             flowFile = session.write(flowFile, new StreamCallback() {
+                 @Override
+                 public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
+                     try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(rawIn, decoder), MAX_BUFFER_SIZE);
+                             final BufferedWriter writer = new 
BufferedWriter(new OutputStreamWriter(rawOut, encoder), MAX_BUFFER_SIZE)) {
+                         int charsRead;
+                         while ((charsRead = reader.read(charBuffer)) != -1) {
+                             charBuffer.flip();
+                             writer.write(charBuffer.array(), 0, charsRead);
+                         }
+ 
+                         writer.flush();
+                     }
+                 }
+             });
+ 
+             session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             logger.info("successfully converted characters from {} to {} for 
{}", new Object[]{
+                 context.getProperty(INPUT_CHARSET).getValue(), 
context.getProperty(OUTPUT_CHARSET).getValue(), flowFile});
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (final Exception e) {
+             throw new ProcessException(e);
+         }
+     }
+ 
+ }

Reply via email to