http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 0000000,51f9ef1..4df4e08 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@@ -1,0 -1,419 +1,419 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.kafka; + + import java.io.IOException; + import java.io.InputStream; + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; + import java.util.HashSet; + import java.util.List; + import java.util.Properties; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.regex.Pattern; + + import kafka.javaapi.producer.Producer; + import kafka.producer.KeyedMessage; + import kafka.producer.ProducerConfig; + ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.components.AllowableValue; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.ByteArrayOutputStream; + import org.apache.nifi.stream.io.ByteCountingInputStream; + import org.apache.nifi.stream.io.StreamUtils; + import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; + import org.apache.nifi.util.LongHolder; + + import scala.actors.threadpool.Arrays; + + @SupportsBatching + @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) + @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") + public class PutKafka extends AbstractProcessor { + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss."); + + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final PropertyDescriptor clientName = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(SEED_BROKERS); + props.add(TOPIC); + props.add(KEY); + props.add(DELIVERY_GUARANTEE); + props.add(MESSAGE_DELIMITER); + props.add(MAX_BUFFER_SIZE); + props.add(TIMEOUT); + props.add(clientName); + return props; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(1); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + + @OnStopped + public void closeProducers() { + Producer<byte[], byte[]> producer; + + while ((producer = producers.poll()) != null) { + producer.close(); + } + } + + protected ProducerConfig createConfig(final ProcessContext context) { + final String brokers = context.getProperty(SEED_BROKERS).getValue(); + + final Properties properties = new Properties(); + properties.setProperty("metadata.broker.list", brokers); + properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); + properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); + + properties.setProperty("message.send.max.retries", "1"); + properties.setProperty("producer.type", "sync"); + + return new ProducerConfig(properties); + } + + protected Producer<byte[], byte[]> createProducer(final ProcessContext context) { + return new Producer<>(createConfig(context)); + } + + private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) { + Producer<byte[], byte[]> producer = producers.poll(); + return producer == null ? createProducer(context) : producer; + } + + private void returnProducer(final Producer<byte[], byte[]> producer) { + producers.offer(producer); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final long start = System.nanoTime(); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); + if ( delimiter != null ) { + delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + } + + final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); + final Producer<byte[], byte[]> producer = borrowProducer(context); + + if ( delimiter == null ) { + // Send the entire FlowFile as a single message. + final byte[] value = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, value); + } + }); + + boolean error = false; + try { + final KeyedMessage<byte[], byte[]> message; + if ( key == null ) { + message = new KeyedMessage<>(topic, value); + } else { + message = new KeyedMessage<>(topic, keyBytes, value); + } + + producer.send(message); + final long nanos = System.nanoTime() - start; + + session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + } catch (final Exception e) { + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + error = true; + } finally { + if ( error ) { + producer.close(); + } else { + returnProducer(producer); + } + } + } else { + final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); + + // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see + // if it matches some pattern. We can use this to search for the delimiter as we read through + // the stream of bytes in the FlowFile + final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); + + boolean error = false; + final LongHolder lastMessageOffset = new LongHolder(0L); + final LongHolder messagesSent = new LongHolder(0L); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + byte[] data = null; // contents of a single message + + boolean streamFinished = false; + + final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send + long messageBytes = 0L; // size of messages in the 'messages' list + + int nextByte; + try (final InputStream bufferedIn = new BufferedInputStream(rawIn); + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + + // read until we're out of data. + while (!streamFinished) { + nextByte = in.read(); + + if ( nextByte > -1 ) { + baos.write(nextByte); + } + + if (nextByte == -1) { + // we ran out of data. This message is complete. + data = baos.toByteArray(); + streamFinished = true; + } else if ( buffer.addAndCompare((byte) nextByte) ) { + // we matched our delimiter. This message is complete. We want all of the bytes from the + // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want + // the delimiter itself to be sent. + data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); + } + + createMessage: if ( data != null ) { + // If the message has no data, ignore it. + if ( data.length == 0 ) { + data = null; + baos.reset(); + break createMessage; + } + + // either we ran out of data or we reached the end of the message. + // Either way, create the message because it's ready to send. + final KeyedMessage<byte[], byte[]> message; + if ( key == null ) { + message = new KeyedMessage<>(topic, data); + } else { + message = new KeyedMessage<>(topic, keyBytes, data); + } + + // Add the message to the list of messages ready to send. If we've reached our + // threshold of how many we're willing to send (or if we're out of data), go ahead + // and send the whole List. + messages.add(message); + messageBytes += data.length; + if ( messageBytes >= maxBufferSize || streamFinished ) { + // send the messages, then reset our state. + try { + producer.send(messages); + } catch (final Exception e) { + // we wrap the general exception in ProcessException because we want to separate + // failures in sending messages from general Exceptions that would indicate bugs + // in the Processor. Failure to send a message should be handled appropriately, but + // we don't want to catch the general Exception or RuntimeException in order to catch + // failures from Kafka's Producer. + throw new ProcessException("Failed to send messages to Kafka", e); + } + + messagesSent.addAndGet(messages.size()); // count number of messages sent + + // reset state + messages.clear(); + messageBytes = 0; + + // We've successfully sent a batch of messages. Keep track of the byte offset in the + // FlowFile of the last successfully sent message. This way, if the messages cannot + // all be successfully sent, we know where to split off the data. This allows us to then + // split off the first X number of bytes and send to 'success' and then split off the rest + // and send them to 'failure'. + lastMessageOffset.set(in.getBytesConsumed()); + } + + // reset BAOS so that we can start a new message. + baos.reset(); + data = null; + } + } + + // If there are messages left, send them + if ( !messages.isEmpty() ) { + try { + producer.send(messages); + } catch (final Exception e) { + throw new ProcessException("Failed to send messages to Kafka", e); + } + } + } + } + }); + + final long nanos = System.nanoTime() - start; + + session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + } catch (final ProcessException pe) { + error = true; + + // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can + // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to + // 'success' while we send the others to 'failure'. + final long offset = lastMessageOffset.get(); + if ( offset == 0L ) { + // all of the messages failed to send. Route FlowFile to failure + getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()}); + session.transfer(flowFile, REL_FAILURE); + } else { + // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. + final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); + final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); + + getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { + messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); + + session.transfer(successfulMessages, REL_SUCCESS); + session.transfer(failedMessages, REL_FAILURE); + session.remove(flowFile); + session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); + } + } finally { + if ( error ) { + producer.close(); + } else { + returnProducer(producer); + } + } + + } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 0000000,06a4604..b0f2394 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@@ -1,0 -1,236 +1,236 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.kafka; + + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertTrue; + + import java.nio.charset.StandardCharsets; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import kafka.common.FailedToSendMessageException; + import kafka.javaapi.producer.Producer; + import kafka.producer.KeyedMessage; + import kafka.producer.ProducerConfig; + ++import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.annotation.OnScheduled; + import org.apache.nifi.util.MockFlowFile; + import org.apache.nifi.util.TestRunner; + import org.apache.nifi.util.TestRunners; + import org.junit.Ignore; + import org.junit.Test; + + + public class TestPutKafka { + + @Test + public void testMultipleKeyValuePerFlowFile() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + + final List<byte[]> messages = proc.getProducer().getMessages(); + assertEquals(11, messages.size()); + + assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0))); + assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1))); + assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2))); + assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3))); + assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4))); + assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5))); + assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6))); + assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7))); + assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8))); + assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9))); + assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10))); + } + + + @Test + public void testWithImmediateFailure() { + final TestableProcessor proc = new TestableProcessor(0); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; + runner.enqueue(text.getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + mff.assertContentEquals(text); + } + + + @Test + public void testPartialFailure() { + final TestableProcessor proc = new TestableProcessor(2); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); + + final byte[] bytes = "1\n2\n3\n4".getBytes(); + runner.enqueue(bytes); + runner.run(); + + runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + successFF.assertContentEquals("1\n2\n"); + + final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + failureFF.assertContentEquals("3\n4"); + } + + + @Test + public void testWithEmptyMessages() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + + final List<byte[]> msgs = proc.getProducer().getMessages(); + assertEquals(4, msgs.size()); + assertTrue(Arrays.equals("1".getBytes(), msgs.get(0))); + assertTrue(Arrays.equals("2".getBytes(), msgs.get(1))); + assertTrue(Arrays.equals("3".getBytes(), msgs.get(2))); + assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); + } + + + @Test + @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") + public void testKeyValuePut() { + final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); + runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); + runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); + runner.setProperty(PutKafka.KEY, "${kafka.key}"); + runner.setProperty(PutKafka.TIMEOUT, "3 secs"); + runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("kafka.topic", "test"); + attributes.put("kafka.key", "key3"); + + final byte[] data = "Hello, World, Again! ;)".getBytes(); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + + runner.run(5); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); + final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); + final MockFlowFile mff = mffs.get(0); + + assertTrue(Arrays.equals(data, mff.toByteArray())); + } + + + private static class TestableProcessor extends PutKafka { + private MockProducer producer; + private int failAfter = Integer.MAX_VALUE; + + public TestableProcessor() { + } + + public TestableProcessor(final int failAfter) { + this.failAfter = failAfter; + } + + @OnScheduled + public void instantiateProducer(final ProcessContext context) { + producer = new MockProducer(createConfig(context)); + producer.setFailAfter(failAfter); + } + + @Override + protected Producer<byte[], byte[]> createProducer(final ProcessContext context) { + return producer; + } + + public MockProducer getProducer() { + return producer; + } + } + + + private static class MockProducer extends Producer<byte[], byte[]> { + private int sendCount = 0; + private int failAfter = Integer.MAX_VALUE; + + private final List<byte[]> messages = new ArrayList<>(); + + public MockProducer(final ProducerConfig config) { + super(config); + } + + @Override + public void send(final KeyedMessage<byte[], byte[]> message) { + if ( ++sendCount > failAfter ) { + throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); + } else { + messages.add(message.message()); + } + } + + public List<byte[]> getMessages() { + return messages; + } + + @Override + public void send(final List<KeyedMessage<byte[], byte[]>> messages) { + for ( final KeyedMessage<byte[], byte[]> msg : messages ) { + send(msg); + } + } + + public void setFailAfter(final int successCount) { + failAfter = successCount; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index 0000000,521bd94..d9175e0 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@@ -1,0 -1,142 +1,145 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; ++import java.io.IOException; ++import java.io.InputStream; ++import java.io.OutputStream; ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Set; ++import java.util.concurrent.TimeUnit; ++ ++import org.apache.commons.codec.binary.Base64InputStream; ++import org.apache.commons.codec.binary.Base64OutputStream; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.processor.AbstractProcessor; ++import org.apache.nifi.processor.ProcessContext; ++import org.apache.nifi.processor.ProcessSession; ++import org.apache.nifi.processor.ProcessorInitializationContext; ++import org.apache.nifi.processor.Relationship; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream; + import org.apache.nifi.util.StopWatch; + -import org.apache.commons.codec.binary.Base64InputStream; -import org.apache.commons.codec.binary.Base64OutputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.*; -import java.util.concurrent.TimeUnit; - + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"experimental", "encode", "base64"}) + @CapabilityDescription("Encodes the FlowFile content in base64") + public class Base64EncodeContent extends AbstractProcessor { + + public static final String ENCODE_MODE = "Encode"; + public static final String DECODE_MODE = "Decode"; + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encoded or decoded") + .required(true) + .allowableValues(ENCODE_MODE, DECODE_MODE) + .defaultValue(ENCODE_MODE) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encoded or decoded will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encoded or decoded will be routed to failure").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); + try { + final StopWatch stopWatch = new StopWatch(true); + if (encode) { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64OutputStream bos = new Base64OutputStream(out)) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = in.read(buf)) > 0) { + bos.write(buf, 0, len); + } + bos.flush(); + } + } + }); + } else { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { + int len = -1; + byte[] buf = new byte[8192]; + while ((len = bis.read(buf)) > 0) { + out.write(buf, 0, len); + } + out.flush(); + } + } + }); + } + + logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 0000000,21dfe93..cf20f16 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@@ -1,0 -1,307 +1,307 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.Closeable; + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.TimeUnit; + + import lzma.sdk.lzma.Decoder; + import lzma.streams.LzmaInputStream; + import lzma.streams.LzmaOutputStream; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.stream.io.BufferedInputStream; + import org.apache.nifi.stream.io.BufferedOutputStream; + import org.apache.nifi.stream.io.GZIPOutputStream; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.util.ObjectHolder; + import org.apache.nifi.util.StopWatch; + + import org.apache.commons.compress.compressors.CompressorStreamFactory; + import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; + import org.tukaani.xz.LZMA2Options; + import org.tukaani.xz.XZInputStream; + import org.tukaani.xz.XZOutputStream; + + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) + @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate") + public class CompressContent extends AbstractProcessor { + + public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute"; + public static final String COMPRESSION_FORMAT_GZIP = "gzip"; + public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2"; + public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2"; + public static final String COMPRESSION_FORMAT_LZMA = "lzma"; + + public static final String MODE_COMPRESS = "compress"; + public static final String MODE_DECOMPRESS = "decompress"; + + public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + .name("Compression Format") + .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA") + .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA) + .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) + .required(true) + .build(); + public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() + .name("Compression Level") + .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing but less compression; a value of 0 indicates no compression but simply archiving") + .defaultValue("1") + .required(true) + .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") + .build(); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'") + .allowableValues(MODE_COMPRESS, MODE_DECOMPRESS) + .defaultValue(MODE_COMPRESS) + .required(true) + .build(); + public static final PropertyDescriptor UPDATE_FILENAME = new PropertyDescriptor.Builder() + .name("Update Filename") + .description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate compression format) and add the appropriate extension when compressing data") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private Map<String, String> compressionFormatMimeTypeMap; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(MODE); + properties.add(COMPRESSION_FORMAT); + properties.add(COMPRESSION_LEVEL); + properties.add(UPDATE_FILENAME); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + final Map<String, String> mimeTypeMap = new HashMap<>(); + mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP); + mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2); + mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA); + this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final long sizeBeforeCompression = flowFile.getSize(); + final String compressionMode = context.getProperty(MODE).getValue(); + + String compressionFormatValue = context.getProperty(COMPRESSION_FORMAT).getValue(); + if (compressionFormatValue.equals(COMPRESSION_FORMAT_ATTRIBUTE)) { + final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + if (mimeType == null) { + logger.error("No {} attribute exists for {}; routing to failure", new Object[]{CoreAttributes.MIME_TYPE.key(), flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType); + if (compressionFormatValue == null) { + logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing", + new Object[]{flowFile, mimeType}); + session.transfer(flowFile, REL_SUCCESS); + return; + } + } + + final String compressionFormat = compressionFormatValue; + final ObjectHolder<String> mimeTypeRef = new ObjectHolder<>(null); + final StopWatch stopWatch = new StopWatch(true); + + final String fileExtension; + switch (compressionFormat.toLowerCase()) { + case COMPRESSION_FORMAT_GZIP: + fileExtension = ".gz"; + break; + case COMPRESSION_FORMAT_LZMA: + fileExtension = ".lzma"; + break; + case COMPRESSION_FORMAT_XZ_LZMA2: + fileExtension = ".xz"; + break; + case COMPRESSION_FORMAT_BZIP2: + fileExtension = ".bz2"; + break; + default: + fileExtension = ""; + break; + } + + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { + final OutputStream compressionOut; + final InputStream compressionIn; + + final OutputStream bufferedOut = new BufferedOutputStream(rawOut, 65536); + final InputStream bufferedIn = new BufferedInputStream(rawIn, 65536); + + try { + if (MODE_COMPRESS.equalsIgnoreCase(compressionMode)) { + compressionIn = bufferedIn; + + switch (compressionFormat.toLowerCase()) { + case COMPRESSION_FORMAT_GZIP: + final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); + compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel); + mimeTypeRef.set("application/gzip"); + break; + case COMPRESSION_FORMAT_LZMA: + compressionOut = new LzmaOutputStream.Builder(bufferedOut).build(); + mimeTypeRef.set("application/x-lzma"); + break; + case COMPRESSION_FORMAT_XZ_LZMA2: + compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options()); + mimeTypeRef.set("application/x-xz"); + break; + case COMPRESSION_FORMAT_BZIP2: + default: + mimeTypeRef.set("application/bzip2"); + compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); + break; + } + } else { + compressionOut = bufferedOut; + switch (compressionFormat.toLowerCase()) { + case COMPRESSION_FORMAT_LZMA: + compressionIn = new LzmaInputStream(bufferedIn, new Decoder()); + break; + case COMPRESSION_FORMAT_XZ_LZMA2: + compressionIn = new XZInputStream(bufferedIn); + break; + case COMPRESSION_FORMAT_BZIP2: + // need this two-arg constructor to support concatenated streams + compressionIn = new BZip2CompressorInputStream(bufferedIn, true); + break; + case COMPRESSION_FORMAT_GZIP: + compressionIn = new GzipCompressorInputStream(bufferedIn, true); + break; + default: + compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn); + } + } + } catch (final Exception e) { + closeQuietly(bufferedOut); + throw new IOException(e); + } + + try (final InputStream in = compressionIn; + final OutputStream out = compressionOut) { + final byte[] buffer = new byte[8192]; + int len; + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.flush(); + } + } + }); + stopWatch.stop(); + + final long sizeAfterCompression = flowFile.getSize(); + if (MODE_DECOMPRESS.equalsIgnoreCase(compressionMode)) { + flowFile = session.removeAttribute(flowFile, CoreAttributes.MIME_TYPE.key()); + + if (context.getProperty(UPDATE_FILENAME).asBoolean()) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + if (filename.toLowerCase().endsWith(fileExtension)) { + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename.substring(0, filename.length() - fileExtension.length())); + } + } + } else { + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get()); + + if (context.getProperty(UPDATE_FILENAME).asBoolean()) { + final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename + fileExtension); + } + } + + logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes", new Object[]{ + compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + + private void closeQuietly(final Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (final Exception e) { + } + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 0000000,a965442..2c1a864 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@@ -1,0 -1,381 +1,381 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.locks.ReentrantLock; + import java.util.regex.Pattern; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerSerially; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.behavior.TriggerSerially; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.timebuffer.EntityAccess; + import org.apache.nifi.util.timebuffer.TimedBuffer; + + @SideEffectFree + @TriggerSerially + @Tags({"rate control", "throttle", "rate", "throughput"}) + @CapabilityDescription("Controls the rate at which data is transferred to follow-on processors.") + public class ControlRate extends AbstractProcessor { + + public static final String DATA_RATE = "data rate"; + public static final String FLOWFILE_RATE = "flowfile count"; + public static final String ATTRIBUTE_RATE = "attribute value"; + + public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() + .name("Rate Control Criteria") + .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") + .required(true) + .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE) + .defaultValue(DATA_RATE) + .build(); + public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder() + .name("Maximum Rate") + .description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in customValidate b/c dependent on Rate Control Criteria + .build(); + public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Rate Controlled Attribute") + .description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. The value of the attribute referenced by this property must be a positive integer, or the FlowFile will be routed to failure. This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder() + .name("Time Duration") + .description("The amount of time to which the Maximum Data Size and Maximum Number of Files pertains. Changing this value resets the rate counters.") + .required(true) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .defaultValue("1 min") + .build(); + public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("Grouping Attribute") + .description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for each value specified by the attribute with this name. Changing this value resets the rate counters.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are transferred to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if they are missing a necessary attribute or the attribute is not in the expected format").build(); + + private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*"); + private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###"; + + private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<>(); + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis()); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RATE_CONTROL_CRITERIA); + properties.add(MAX_RATE); + properties.add(RATE_CONTROL_ATTRIBUTE_NAME); + properties.add(TIME_PERIOD); + properties.add(GROUPING_ATTRIBUTE_NAME); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context)); + + final Validator rateValidator; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValidator = StandardValidators.DATA_SIZE_VALIDATOR; + break; + case ATTRIBUTE_RATE: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + final String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + if (rateAttr == null) { + validationResults.add(new ValidationResult.Builder().subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'").build()); + } + break; + case FLOWFILE_RATE: + default: + rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR; + break; + } + + final ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), null); + if (!rateResult.isValid()) { + validationResults.add(rateResult); + } + + return validationResults; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + + if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) || descriptor.equals(TIME_PERIOD)) { + // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. + throttleMap.clear(); + } else if (descriptor.equals(MAX_RATE)) { + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) { + newRate = DataUnit.parseDataSize(newValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(newValue); + } + + for (final Throttle throttle : throttleMap.values()) { + throttle.setMaxRate(newRate); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final long lastClearTime = lastThrottleClearTime.get(); + final long throttleExpirationMillis = System.currentTimeMillis() - 2 * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); + if (lastClearTime < throttleExpirationMillis) { + if (lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) { + final Iterator<Map.Entry<String, Throttle>> itr = throttleMap.entrySet().iterator(); + while (itr.hasNext()) { + final Map.Entry<String, Throttle> entry = itr.next(); + final Throttle throttle = entry.getValue(); + if (throttle.tryLock()) { + try { + if (throttle.lastUpdateTime() < lastClearTime) { + itr.remove(); + } + } finally { + throttle.unlock(); + } + } + } + } + } + + // TODO: Should periodically clear any Throttle that has not been used in more than 2 throttling periods + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final long seconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS); + final String rateControlAttributeName = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); + long rateValue; + switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) { + case DATA_RATE: + rateValue = flowFile.getSize(); + break; + case FLOWFILE_RATE: + rateValue = 1; + break; + case ATTRIBUTE_RATE: + final String attributeValue = flowFile.getAttribute(rateControlAttributeName); + if (attributeValue == null) { + logger.error("routing {} to 'failure' because FlowFile is missing required attribute {}", + new Object[]{flowFile, rateControlAttributeName}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) { + logger.error("routing {} to 'failure' because FlowFile attribute {} has a value of {}, which is not a positive integer", + new Object[]{flowFile, rateControlAttributeName, attributeValue}); + session.transfer(flowFile, REL_FAILURE); + return; + } + rateValue = Long.parseLong(attributeValue); + break; + default: + throw new AssertionError("<Rate Control Criteria> property set to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue()); + } + + final String groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); + final String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); + Throttle throttle = throttleMap.get(groupName); + if (throttle == null) { + throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger); + + final String maxRateValue = context.getProperty(MAX_RATE).getValue(); + final long newRate; + if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) { + newRate = DataUnit.parseDataSize(maxRateValue, DataUnit.B).longValue(); + } else { + newRate = Long.parseLong(maxRateValue); + } + throttle.setMaxRate(newRate); + + throttleMap.put(groupName, throttle); + } + + throttle.lock(); + try { + if (throttle.tryAdd(rateValue)) { + logger.info("transferring {} to 'success'", new Object[]{flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } else { + flowFile = session.penalize(flowFile); + session.transfer(flowFile); + } + } finally { + throttle.unlock(); + } + } + + private static class TimestampedLong { + + private final Long value; + private final long timestamp = System.currentTimeMillis(); + + public TimestampedLong(final Long value) { + this.value = value; + } + + public Long getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + } + + private static class RateEntityAccess implements EntityAccess<TimestampedLong> { + + @Override + public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong toAdd) { + if (oldValue == null && toAdd == null) { + return new TimestampedLong(0L); + } else if (oldValue == null) { + return toAdd; + } else if (toAdd == null) { + return oldValue; + } + + return new TimestampedLong(oldValue.getValue() + toAdd.getValue()); + } + + @Override + public TimestampedLong createNew() { + return new TimestampedLong(0L); + } + + @Override + public long getTimestamp(TimestampedLong entity) { + return entity == null ? 0L : entity.getTimestamp(); + } + } + + private static class Throttle extends ReentrantLock { + + private final AtomicLong maxRate = new AtomicLong(1L); + private final long timePeriodValue; + private final TimeUnit timePeriodUnit; + private final TimedBuffer<TimestampedLong> timedBuffer; + private final ProcessorLog logger; + + private volatile long penalizationExpired; + private volatile long lastUpdateTime; + + public Throttle(final int timePeriod, final TimeUnit unit, final ProcessorLog logger) { + this.timePeriodUnit = unit; + this.timePeriodValue = timePeriod; + this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new RateEntityAccess()); + this.logger = logger; + } + + public void setMaxRate(final long maxRate) { + this.maxRate.set(maxRate); + } + + public long lastUpdateTime() { + return lastUpdateTime; + } + + public boolean tryAdd(final long value) { + final long now = System.currentTimeMillis(); + if (penalizationExpired > now) { + return false; + } + + final long maxRateValue = maxRate.get(); + + final TimestampedLong sum = timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit)); + if (sum != null && sum.getValue() >= maxRateValue) { + logger.debug("current sum for throttle is {}, so not allowing rate of {} through", new Object[]{sum.getValue(), value}); + return false; + } + + logger.debug("current sum for throttle is {}, so allowing rate of {} through", new Object[]{sum == null ? 0 : sum.getValue(), value}); + + final long transferred = timedBuffer.add(new TimestampedLong(value)).getValue(); + if (transferred > maxRateValue) { + final long amountOver = transferred - maxRateValue; + // determine how long it should take to transfer 'amountOver' and 'penalize' the Throttle for that long + final long milliDuration = TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit); + final double pct = (double) amountOver / (double) maxRateValue; + final long penalizationPeriod = (long) (milliDuration * pct); + this.penalizationExpired = now + penalizationPeriod; + logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, penalizationPeriod}); + } + + lastUpdateTime = now; + return true; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java index 0000000,d67179f..119a3f2 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertCharacterSet.java @@@ -1,0 -1,175 +1,175 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.EventDriven; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.behavior.SupportsBatching; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.util.StopWatch; + + import java.io.*; + import java.nio.CharBuffer; + import java.nio.charset.Charset; + import java.nio.charset.CharsetDecoder; + import java.nio.charset.CharsetEncoder; + import java.nio.charset.CodingErrorAction; + import java.util.*; + import java.util.concurrent.TimeUnit; + + /** + * <p> + * This processor reads files in as text according to the specified character + * set and it outputs another text file according to the given characeter set. + * The character sets supported depend on the version of the JRE and is platform + * specific. In addition, the JVM can be expanded with additional character sets + * to support. More information on which character sets are supported can be + * found in the JDK documentation under the docs directory in the following + * path: ....\technotes\guides\intl\encoding.doc.html</p> + * + * <p> + * The conversion process is very passive. For conversions that do not map + * perfectly the conversion will replace unmappable or unrecognized input using + * the '?' character. + * + * <p> + * The following properties are required: <ul> <li><b>input.charset</b> - The + * character set of the original file contents</li> <li><b>output.charset</b> - + * The character set of the resulting file</li> </ul> </p> + * + * <p> + * The following properties are optional: <ul> <li><b>N/A</b> - </li> </ul> + * </p> + * + * <p> + * The following relationships are required: <ul> <li><b>success</b> - the id of + * the processor to transfer successfully converted files</li> + * <li><b>failure</b> - the id of the processor to transfer unsuccessfully + * converted files</li> </ul> </p> + */ + @EventDriven + @SideEffectFree + @SupportsBatching + @Tags({"text", "convert", "characterset", "character set"}) + @CapabilityDescription("Converts a FlowFile's content from one character set to another") + public class ConvertCharacterSet extends AbstractProcessor { + + public static final PropertyDescriptor INPUT_CHARSET = new PropertyDescriptor.Builder() + .name("Input Character Set") + .description("The name of the CharacterSet to expect for Input") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .build(); + public static final PropertyDescriptor OUTPUT_CHARSET = new PropertyDescriptor.Builder() + .name("Output Character Set") + .description("The name of the CharacterSet to convert to") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("").build(); + + public static final int MAX_BUFFER_SIZE = 512 * 1024; + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(INPUT_CHARSET); + properties.add(OUTPUT_CHARSET); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final ProcessorLog logger = getLogger(); + + final Charset inputCharset = Charset.forName(context.getProperty(INPUT_CHARSET).getValue()); + final Charset outputCharset = Charset.forName(context.getProperty(OUTPUT_CHARSET).getValue()); + final CharBuffer charBuffer = CharBuffer.allocate(MAX_BUFFER_SIZE); + + final CharsetDecoder decoder = inputCharset.newDecoder(); + decoder.onMalformedInput(CodingErrorAction.REPLACE); + decoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + decoder.replaceWith("?"); + + final CharsetEncoder encoder = outputCharset.newEncoder(); + encoder.onMalformedInput(CodingErrorAction.REPLACE); + encoder.onUnmappableCharacter(CodingErrorAction.REPLACE); + encoder.replaceWith("?".getBytes(outputCharset)); + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + final StopWatch stopWatch = new StopWatch(true); + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(rawIn, decoder), MAX_BUFFER_SIZE); + final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(rawOut, encoder), MAX_BUFFER_SIZE)) { + int charsRead; + while ((charsRead = reader.read(charBuffer)) != -1) { + charBuffer.flip(); + writer.write(charBuffer.array(), 0, charsRead); + } + + writer.flush(); + } + } + }); + + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("successfully converted characters from {} to {} for {}", new Object[]{ + context.getProperty(INPUT_CHARSET).getValue(), context.getProperty(OUTPUT_CHARSET).getValue(), flowFile}); + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + throw new ProcessException(e); + } + } + + }
