NIFI-1273 Adding ListenRELP processor which includes refactoring code that was previously part of ListenSyslog into a reusable framework for implementing listener processors, as well as back-end code for handling the RELP protocol. Addressing some feedback from review and fixing logging statements Addressing review feedback, added AsyncChannelDispatcher and made ChannelHandlerFactory use generics for the dispatcher This closes #179
Signed-off-by: Matt Gilman <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1089f0a9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1089f0a9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1089f0a9 Branch: refs/heads/master Commit: 1089f0a956fbe1e302bb9d37fe6be05be5860646 Parents: 2f5f7b8 Author: Bryan Bende <[email protected]> Authored: Tue Jan 5 17:41:33 2016 -0500 Committer: Matt Gilman <[email protected]> Committed: Fri Jan 22 16:28:58 2016 -0500 ---------------------------------------------------------------------- nifi-commons/nifi-processor-utilities/pom.xml | 4 + .../listen/AbstractListenEventProcessor.java | 358 ++++++++++++ .../dispatcher/AsyncChannelDispatcher.java | 40 ++ .../listen/dispatcher/ChannelDispatcher.java | 51 ++ .../dispatcher/DatagramChannelDispatcher.java | 155 +++++ .../dispatcher/SocketChannelAttachment.java | 44 ++ .../dispatcher/SocketChannelDispatcher.java | 243 ++++++++ .../nifi/processor/util/listen/event/Event.java | 46 ++ .../util/listen/event/EventFactory.java | 44 ++ .../util/listen/event/EventFactoryUtil.java | 33 ++ .../util/listen/event/StandardEvent.java | 52 ++ .../util/listen/handler/ChannelHandler.java | 55 ++ .../listen/handler/ChannelHandlerFactory.java | 46 ++ .../handler/socket/SSLSocketChannelHandler.java | 152 +++++ .../handler/socket/SocketChannelHandler.java | 51 ++ .../socket/SocketChannelHandlerFactory.java | 55 ++ .../socket/StandardSocketChannelHandler.java | 157 +++++ .../util/listen/response/ChannelResponder.java | 50 ++ .../util/listen/response/ChannelResponse.java | 29 + .../socket/SSLSocketChannelResponder.java | 44 ++ .../response/socket/SocketChannelResponder.java | 69 +++ .../standard/AbstractSyslogProcessor.java | 33 +- .../processors/standard/HandleHttpResponse.java | 2 +- .../nifi/processors/standard/ListenRELP.java | 225 ++++++++ .../nifi/processors/standard/ListenSyslog.java | 576 ++----------------- .../nifi/processors/standard/ParseSyslog.java | 6 +- .../nifi/processors/standard/PutSyslog.java | 2 +- .../standard/relp/event/RELPEvent.java | 46 ++ .../standard/relp/event/RELPEventFactory.java | 37 ++ .../standard/relp/event/RELPMetadata.java | 27 + .../standard/relp/frame/RELPDecoder.java | 195 +++++++ .../standard/relp/frame/RELPEncoder.java | 66 +++ .../standard/relp/frame/RELPFrame.java | 110 ++++ .../standard/relp/frame/RELPFrameException.java | 32 ++ .../standard/relp/frame/RELPState.java | 30 + .../standard/relp/handler/RELPFrameHandler.java | 91 +++ .../handler/RELPSSLSocketChannelHandler.java | 88 +++ .../relp/handler/RELPSocketChannelHandler.java | 98 ++++ .../RELPSocketChannelHandlerFactory.java | 56 ++ .../relp/response/RELPChannelResponse.java | 42 ++ .../standard/relp/response/RELPResponse.java | 162 ++++++ .../standard/syslog/SyslogAttributes.java | 48 ++ .../processors/standard/syslog/SyslogEvent.java | 180 ++++++ .../standard/syslog/SyslogParser.java | 165 ++++++ .../processors/standard/util/SyslogEvent.java | 180 ------ .../processors/standard/util/SyslogParser.java | 165 ------ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestListenRELP.java | 241 ++++++++ .../processors/standard/TestListenSyslog.java | 70 +-- .../processors/standard/TestParseSyslog.java | 2 +- .../standard/relp/RELPFrameProducer.java | 103 ++++ .../relp/event/TestRELPEventFactory.java | 55 ++ .../standard/relp/frame/TestRELPDecoder.java | 138 +++++ .../standard/relp/frame/TestRELPEncoder.java | 89 +++ .../standard/relp/frame/TestRELPFrame.java | 47 ++ .../relp/handler/TestRELPFrameHandler.java | 163 ++++++ .../handler/TestRELPSocketChannelHandler.java | 209 +++++++ .../relp/response/TestRELPResponse.java | 169 ++++++ .../standard/util/TestSyslogParser.java | 2 + 59 files changed, 4793 insertions(+), 936 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/pom.xml b/nifi-commons/nifi-processor-utilities/pom.xml index e25d782..bd55ddd 100644 --- a/nifi-commons/nifi-processor-utilities/pom.xml +++ b/nifi-commons/nifi-processor-utilities/pom.xml @@ -36,5 +36,9 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-security-utils</artifactId> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java new file mode 100644 index 0000000..e994d81 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * An abstract processor to extend from when listening for events over a channel. This processor + * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, in a background + * thread which will end up placing events on a queue to polled by the onTrigger method. Sub-classes + * are responsible for providing the dispatcher implementations. + * + * @param <E> the type of events being produced + */ +public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor { + + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port to listen on for communication.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the received data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Receive Buffer Size") + .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " + + "incoming messages.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 B") + .required(true) + .build(); + public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Buffer") + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + + // Putting these properties here so sub-classes don't have to redefine them, but they are + // not added to the properties by default since not all processors may need them + + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Number of TCP Connections") + .description("The maximum number of concurrent TCP connections to accept.") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Max Batch Size") + .description( + "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with " + + "the <Message Delimiter> up to this configured maximum number of messages") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .required(true) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("\\n") + .required(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Messages received successfully will be sent out this relationship.") + .build(); + + public static final int POLL_TIMEOUT_MS = 100; + + private Set<Relationship> relationships; + private List<PropertyDescriptor> descriptors; + + protected volatile int port; + protected volatile Charset charset; + protected volatile ChannelDispatcher dispatcher; + protected volatile BlockingQueue<E> events = new LinkedBlockingQueue<>(10); + protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(PORT); + descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_SOCKET_BUFFER_SIZE); + descriptors.add(CHARSET); + descriptors.addAll(getAdditionalProperties()); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.addAll(getAdditionalRelationships()); + this.relationships = Collections.unmodifiableSet(relationships); + } + + /** + * Override to provide additional relationships for the processor. + * + * @return a list of relationships + */ + protected List<Relationship> getAdditionalRelationships() { + return Collections.EMPTY_LIST; + } + + /** + * Override to provide additional properties for the processor. + * + * @return a list of properties + */ + protected List<PropertyDescriptor> getAdditionalProperties() { + return Collections.EMPTY_LIST; + } + + @Override + public final Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + charset = Charset.forName(context.getProperty(CHARSET).getValue()); + port = context.getProperty(PORT).asInteger(); + + final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + + // create the dispatcher and call open() to bind to the given port + dispatcher = createDispatcher(context, events); + dispatcher.open(port, maxChannelBufferSize); + + // start a thread to run the dispatcher + final Thread readerThread = new Thread(dispatcher); + readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * @param context the ProcessContext to retrieve property values from + * @return a ChannelDispatcher to handle incoming connections + * + * @throws IOException if unable to listen on the requested port + */ + protected abstract ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<E> events) throws IOException; + + // used for testing to access the random port that was selected + public final int getDispatcherPort() { + return dispatcher == null ? 0 : dispatcher.getPort(); + } + + public int getErrorQueueSize() { + return errorEvents.size(); + } + + @OnUnscheduled + public void onUnscheduled() { + if (dispatcher != null) { + dispatcher.stop(); + dispatcher.close(); + } + } + + /** + * If pollErrorQueue is true, the error queue will be checked first and event will be + * returned from the error queue if available. + * + * If pollErrorQueue is false, or no data is in the error queue, the regular queue is polled. + * + * If longPoll is true, the regular queue will be polled with a short timeout, otherwise it will + * poll with no timeout which will return immediately. + * + * @param longPoll whether or not to poll the main queue with a small timeout + * @param pollErrorQueue whether or not to poll the error queue first + * + * @return an event from one of the queues, or null if none are available + */ + protected E getMessage(final boolean longPoll, final boolean pollErrorQueue) { + E event = null; + if (pollErrorQueue) { + event = errorEvents.poll(); + } + + if (event == null) { + try { + if (longPoll) { + event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } else { + event = events.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + return event; + } + + /** + * Batches together up to the batchSize events. Events are grouped together based on a batch key which + * by default is the sender of the event, but can be override by sub-classes. + * + * This method will return when batchSize has been reached, or when no more events are available on the queue. + * + * @param session the current session + * @param totalBatchSize the total number of events to process + * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile + * + * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all + * the batches will be <= batchSize + */ + protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize, + final byte[] messageDemarcatorBytes) { + + final Map<String,FlowFileEventBatch> batches = new HashMap<>(); + for (int i=0; i < totalBatchSize; i++) { + final E event = getMessage(true, true); + if (event == null) { + break; + } + + final String batchKey = getBatchKey(event); + FlowFileEventBatch batch = batches.get(batchKey); + + // if we don't have a batch for this key then create a new one + if (batch == null) { + batch = new FlowFileEventBatch(session.create(), new ArrayList<E>()); + batches.put(batchKey, batch); + } + + // add the current event to the batch + batch.getEvents().add(event); + + // append the event's data to the FlowFile, write the demarcator first if not on the first event + final boolean writeDemarcator = (i > 0); + try { + final byte[] rawMessage = event.getData(); + FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (writeDemarcator) { + out.write(messageDemarcatorBytes); + } + + out.write(rawMessage); + } + }); + + // update the FlowFile reference in the batch object + batch.setFlowFile(appendedFlowFile); + + } catch (final Exception e) { + getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", + new Object[] {e.getMessage()}, e); + errorEvents.offer(event); + break; + } + + session.adjustCounter("Messages Received", 1L, false); + } + + return batches; + } + + /** + * @param event an event that was pulled off the queue + * + * @return a key to use for batching events together, by default this uses the sender of the + * event, but sub-classes should override this to batch by something else + */ + protected String getBatchKey(final E event) { + return event.getSender(); + } + + /** + * Wrapper to hold a FlowFile and the events that have been appended to it. + */ + protected final class FlowFileEventBatch { + + private FlowFile flowFile; + private List<E> events; + + public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) { + this.flowFile = flowFile; + this.events = events; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public List<E> getEvents() { + return events; + } + + public void setFlowFile(FlowFile flowFile) { + this.flowFile = flowFile; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java new file mode 100644 index 0000000..5215a21 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.nio.channels.SelectionKey; + +/** + * A ChannelDispatcher that handles channels asynchronously. + */ +public interface AsyncChannelDispatcher extends ChannelDispatcher { + + /** + * Informs the dispatcher that the connection for the given key is complete. + * + * @param key a key that was previously selected + */ + void completeConnection(SelectionKey key); + + /** + * Informs the dispatcher that the connection for the given key can be added back for selection. + * + * @param key a key that was previously selected + */ + void addBackForSelection(SelectionKey key); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java new file mode 100644 index 0000000..001ee9b --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.io.IOException; + +/** + * Dispatches handlers for a given channel. + */ +public interface ChannelDispatcher extends Runnable { + + /** + * Opens the dispatcher listening on the given port and attempts to set the + * OS socket buffer to maxBufferSize. + * + * @param port the port to listen on + * @param maxBufferSize the size to set the OS socket buffer to + * @throws IOException if an error occurred listening on the given port + */ + void open(int port, int maxBufferSize) throws IOException; + + /** + * @return the port being listened to + */ + int getPort(); + + /** + * Stops the main dispatcher thread. + */ + void stop(); + + /** + * Closes all listeners and stops all handler threads. + */ + void close(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java new file mode 100644 index 0000000..a00a39f --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for + * processing, otherwise the buffer is returned to the buffer pool. + */ +public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> implements ChannelDispatcher { + + private final EventFactory<E> eventFactory; + private final BlockingQueue<ByteBuffer> bufferPool; + private final BlockingQueue<E> events; + private final ProcessorLog logger; + + private Selector selector; + private DatagramChannel datagramChannel; + private volatile boolean stopped = false; + + public DatagramChannelDispatcher(final EventFactory<E> eventFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ProcessorLog logger) { + this.eventFactory = eventFactory; + this.bufferPool = bufferPool; + this.events = events; + this.logger = logger; + + if (bufferPool == null || bufferPool.size() == 0) { + throw new IllegalArgumentException("A pool of available ByteBuffers is required"); + } + } + + @Override + public void open(final int port, int maxBufferSize) throws IOException { + datagramChannel = DatagramChannel.open(); + datagramChannel.configureBlocking(false); + if (maxBufferSize > 0) { + datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + datagramChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + datagramChannel.register(selector, SelectionKey.OP_READ); + } + + @Override + public void run() { + final ByteBuffer buffer = bufferPool.poll(); + while (!stopped) { + try { + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()) { + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()) { + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress socketAddress; + buffer.clear(); + while (!stopped && (socketAddress = channel.receive(buffer)) != null) { + String sender = ""; + if (socketAddress instanceof InetSocketAddress) { + sender = ((InetSocketAddress) socketAddress).getAddress().toString(); + } + + // create a byte array from the buffer + buffer.flip(); + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + + final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender); + final E event = eventFactory.create(bytes, metadata, null); + + // queue the raw message with the sender, block until space is available + events.put(event); + buffer.clear(); + } + } + } + } catch (InterruptedException e) { + stopped = true; + Thread.currentThread().interrupt(); + } catch (IOException e) { + logger.error("Error reading from DatagramChannel", e); + } + } + + if (buffer != null) { + try { + bufferPool.put(buffer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public int getPort() { + return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); + } + + @Override + public void stop() { + selector.wakeup(); + stopped = true; + } + + @Override + public void close() { + IOUtils.closeQuietly(selector); + IOUtils.closeQuietly(datagramChannel); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java new file mode 100644 index 0000000..f2479f1 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.nio.ByteBuffer; + +/** + * Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the selector key. + * */ +public class SocketChannelAttachment { + + private final ByteBuffer byteBuffer; + private final SSLSocketChannel sslSocketChannel; + + public SocketChannelAttachment(final ByteBuffer byteBuffer, final SSLSocketChannel sslSocketChannel) { + this.byteBuffer = byteBuffer; + this.sslSocketChannel = sslSocketChannel; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public SSLSocketChannel getSslSocketChannel() { + return sslSocketChannel; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java new file mode 100644 index 0000000..6dd6345 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.Channel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Accepts Socket connections on the given port and creates a handler for each connection to + * be executed by a thread pool. + */ +public class SocketChannelDispatcher<E extends Event<SocketChannel>> implements AsyncChannelDispatcher { + + private final EventFactory<E> eventFactory; + private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory; + private final BlockingQueue<ByteBuffer> bufferPool; + private final BlockingQueue<E> events; + private final ProcessorLog logger; + private final int maxConnections; + private final SSLContext sslContext; + private final Charset charset; + + private ExecutorService executor; + private volatile boolean stopped = false; + private Selector selector; + private final BlockingQueue<SelectionKey> keyQueue; + private final AtomicInteger currentConnections = new AtomicInteger(0); + + + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<E> events, + final ProcessorLog logger, + final int maxConnections, + final SSLContext sslContext, + final Charset charset) { + this.eventFactory = eventFactory; + this.handlerFactory = handlerFactory; + this.bufferPool = bufferPool; + this.events = events; + this.logger = logger; + this.maxConnections = maxConnections; + this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.sslContext = sslContext; + this.charset = charset; + + if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { + throw new IllegalArgumentException( + "A pool of available ByteBuffers equal to the maximum number of connections is required"); + } + } + + @Override + public void open(final int port, int maxBufferSize) throws IOException { + this.executor = Executors.newFixedThreadPool(maxConnections); + + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + if (maxBufferSize > 0) { + serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); + final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); + if (actualReceiveBufSize < maxBufferSize) { + logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " + + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " + + "maximum receive buffer"); + } + } + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + } + + @Override + public void run() { + while (!stopped) { + try { + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + if (key.isAcceptable()) { + // Handle new connections coming in + final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); + final SocketChannel socketChannel = channel.accept(); + // Check for available connections + if (currentConnections.incrementAndGet() > maxConnections){ + currentConnections.decrementAndGet(); + logger.warn("Rejecting connection from {} because max connections has been met", + new Object[]{ socketChannel.getRemoteAddress().toString() }); + IOUtils.closeQuietly(socketChannel); + continue; + } + logger.debug("Accepted incoming connection from {}", + new Object[]{socketChannel.getRemoteAddress().toString()}); + // Set socket to non-blocking, and register with selector + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + + // Prepare the byte buffer for the reads, clear it out + ByteBuffer buffer = bufferPool.poll(); + buffer.clear(); + buffer.mark(); + + // If we have an SSLContext then create an SSLEngine for the channel + SSLSocketChannel sslSocketChannel = null; + if (sslContext != null) { + final SSLEngine sslEngine = sslContext.createSSLEngine(); + sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel, false); + } + + // Attach the buffer and SSLSocketChannel to the key + SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslSocketChannel); + readKey.attach(attachment); + } else if (key.isReadable()) { + // Clear out the operations the select is interested in until done reading + key.interestOps(0); + // Create a handler based on the protocol and whether an SSLEngine was provided or not + final Runnable handler; + if (sslContext != null) { + handler = handlerFactory.createSSLHandler(key, this, charset, eventFactory, events, logger); + } else { + handler = handlerFactory.createHandler(key, this, charset, eventFactory, events, logger); + } + + // run the handler + executor.execute(handler); + } + } + } + // Add back all idle sockets to the select + SelectionKey key; + while((key = keyQueue.poll()) != null){ + key.interestOps(SelectionKey.OP_READ); + } + } catch (IOException e) { + logger.error("Error accepting connection from SocketChannel", e); + } + } + } + + @Override + public int getPort() { + // Return the port for the key listening for accepts + for(SelectionKey key : selector.keys()){ + if (key.isValid()) { + final Channel channel = key.channel(); + if (channel instanceof ServerSocketChannel) { + return ((ServerSocketChannel)channel).socket().getLocalPort(); + } + } + } + return 0; + } + + @Override + public void stop() { + stopped = true; + selector.wakeup(); + } + + @Override + public void close() { + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executor.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + for(SelectionKey key : selector.keys()){ + IOUtils.closeQuietly(key.channel()); + } + IOUtils.closeQuietly(selector); + } + + @Override + public void completeConnection(SelectionKey key) { + // connection is done. Return the buffer to the pool + SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + try { + bufferPool.put(attachment.getByteBuffer()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + currentConnections.decrementAndGet(); + } + + @Override + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java new file mode 100644 index 0000000..83989f8 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.nio.channels.SelectableChannel; + +/** + * An event that was read from a channel. + * + * @param <C> the type of SelectableChannel the event was read from + */ +public interface Event<C extends SelectableChannel> { + + /** + * @return the sending host of the data + */ + String getSender(); + + /** + * @return raw data for this event + */ + byte[] getData(); + + /** + * @return the responder to use for responding to this event, or null + * if responses are not supported + */ + ChannelResponder<C> getResponder(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java new file mode 100644 index 0000000..1bd9f0d --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.util.Map; + +/** + * Factory to create instances of a given type of Event. + */ +public interface EventFactory<E extends Event> { + + /** + * The key in the metadata map for the sender. + */ + String SENDER_KEY = "sender"; + + /** + * Creates an event for the given data and metadata. + * + * @param data raw data from a channel + * @param metadata additional metadata + * @param responder a responder for the event with the channel populated + * + * @return an instance of the given type + */ + E create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java new file mode 100644 index 0000000..54529cf --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import java.util.HashMap; +import java.util.Map; + +/** + * Utility methods for EventFactory. + */ +public class EventFactoryUtil { + + public static Map<String,String> createMapWithSender(final String sender) { + Map<String,String> metadata = new HashMap<>(); + metadata.put(EventFactory.SENDER_KEY, sender); + return metadata; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java new file mode 100644 index 0000000..fa3699e --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.event; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.nio.channels.SelectableChannel; + +/** + * Standard implementation of Event. + */ +public class StandardEvent<C extends SelectableChannel> implements Event<C> { + + private final String sender; + private final byte[] data; + private final ChannelResponder<C> responder; + + public StandardEvent(final String sender, final byte[] data, final ChannelResponder<C> responder) { + this.sender = sender; + this.data = data; + this.responder = responder; + } + + @Override + public String getSender() { + return sender; + } + + @Override + public byte[] getData() { + return data; + } + + public ChannelResponder<C> getResponder() { + return responder; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java new file mode 100644 index 0000000..ab4346f --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; + +import java.nio.channels.SelectionKey; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Base class for all channel handlers. + */ +public abstract class ChannelHandler<E extends Event, D extends ChannelDispatcher> implements Runnable { + + protected final SelectionKey key; + protected final D dispatcher; + protected final Charset charset; + protected final EventFactory<E> eventFactory; + protected final BlockingQueue<E> events; + protected final ProcessorLog logger; + + + public ChannelHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + this.key = key; + this.dispatcher = dispatcher; + this.charset = charset; + this.eventFactory = eventFactory; + this.events = events; + this.logger = logger; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java new file mode 100644 index 0000000..d7916fd --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; + +import java.nio.channels.SelectionKey; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Factory that can produce ChannelHandlers for the given type of Event and ChannelDispatcher. + */ +public interface ChannelHandlerFactory<E extends Event, D extends ChannelDispatcher> { + + ChannelHandler<E, D> createHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger); + + ChannelHandler<E, D> createSSLHandler(final SelectionKey key, + final D dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java new file mode 100644 index 0000000..6a2c6f8 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. + */ +public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { + + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public SSLSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public void run() { + boolean eof = false; + SSLSocketChannel sslSocketChannel = null; + try { + int bytesRead; + final SocketChannel socketChannel = (SocketChannel) key.channel(); + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + + // get the SSLSocketChannel from the attachment + sslSocketChannel = attachment.getSslSocketChannel(); + + // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] + final ByteBuffer socketBuffer = attachment.getByteBuffer(); + byte[] socketBufferArray = new byte[socketBuffer.limit()]; + + // read until no more data + try { + while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { + processBuffer(sslSocketChannel, socketChannel, bytesRead, socketBufferArray); + logger.debug("bytes read from sslSocketChannel {}", new Object[]{bytesRead}); + } + } catch (SocketTimeoutException ste) { + // SSLSocketChannel will throw this exception when 0 bytes are read and the timeout threshold + // is exceeded, we don't want to close the connection in this case + bytesRead = 0; + } + + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; + logger.debug("Reached EOF, closing connection"); + } else { + logger.debug("No more data available, returning for selection"); + } + } catch (ClosedByInterruptException | InterruptedException e) { + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; + } catch (ClosedChannelException e) { + // ClosedChannelException doesn't have a message so handle it separately from IOException + logger.error("Error reading from channel due to channel being closed", e); + // Treat same as closed socket + eof = true; + } catch (IOException e) { + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); + // Treat same as closed socket + eof = true; + } finally { + if(eof == true) { + IOUtils.closeQuietly(sslSocketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } + } + } + + /** + * Process the contents of the buffer. Give sub-classes a chance to override this behavior. + * + * @param sslSocketChannel the channel the data was read from + * @param socketChannel the socket channel being wrapped by sslSocketChannel + * @param bytesRead the number of bytes read + * @param buffer the buffer to process + * @throws InterruptedException thrown if interrupted while queuing events + */ + protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, + final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { + final InetAddress sender = socketChannel.socket().getInetAddress(); + + // go through the buffer looking for the end of each message + for (int i = 0; i < bytesRead; i++) { + final byte currByte = buffer[i]; + currBytes.write(currByte); + + // check if at end of a message + if (currByte == getDelimiter()) { + final SSLSocketChannelResponder response = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); + final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + + // queue the raw event blocking until space is available, reset the temporary buffer + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.put(event); + currBytes.reset(); + } + } + } + + @Override + public byte getDelimiter() { + return TCP_DELIMITER; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java new file mode 100644 index 0000000..d20aaa6 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandler; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Base class for socket channel handlers. + */ +public abstract class SocketChannelHandler<E extends Event<SocketChannel>> extends ChannelHandler<E, AsyncChannelDispatcher> { + + static final byte TCP_DELIMITER = '\n'; + + public SocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + /** + * @return the byte used as the delimiter between messages for the given handler + */ + public abstract byte getDelimiter(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java new file mode 100644 index 0000000..cb189ba --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandler; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Default factory for creating socket channel handlers. + */ +public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> { + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + return new StandardSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + return new SSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java new file mode 100644 index 0000000..f12e705 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.handler.socket; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; +import org.apache.nifi.stream.io.ByteArrayOutputStream; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Reads from the given SocketChannel into the provided buffer. If the given delimiter is found, the data + * read up to that point is queued for processing. + */ +public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { + + private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); + + public StandardSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public void run() { + boolean eof = false; + SocketChannel socketChannel = null; + + try { + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + + final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); + final ByteBuffer socketBuffer = attachment.getByteBuffer(); + + // read until the buffer is full + while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { + // prepare byte buffer for reading + socketBuffer.flip(); + // mark the current position as start, in case of partial message read + socketBuffer.mark(); + // process the contents that have been read into the buffer + processBuffer(socketChannel, socketBuffer); + + // Preserve bytes in buffer for next call to run + // NOTE: This code could benefit from the two ByteBuffer read calls to avoid + // this compact for higher throughput + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("bytes read {}", new Object[]{bytesRead}); + } + + // Check for closed socket + if( bytesRead < 0 ){ + eof = true; + logger.debug("Reached EOF, closing connection"); + } else { + logger.debug("No more data available, returning for selection"); + } + } catch (ClosedByInterruptException | InterruptedException e) { + logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket + eof = true; + } catch (ClosedChannelException e) { + // ClosedChannelException doesn't have a message so handle it separately from IOException + logger.error("Error reading from channel due to channel being closed", e); + // Treat same as closed socket + eof = true; + } catch (IOException e) { + logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); + // Treat same as closed socket + eof = true; + } finally { + if(eof == true) { + IOUtils.closeQuietly(socketChannel); + dispatcher.completeConnection(key); + } else { + dispatcher.addBackForSelection(key); + } + } + } + + /** + * Process the contents that have been read into the buffer. Allow sub-classes to override this behavior. + * + * @param socketChannel the channel the data was read from + * @param socketBuffer the buffer the data was read into + * @throws InterruptedException if interrupted when queuing events + */ + protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) throws InterruptedException, IOException { + // get total bytes in buffer + final int total = socketBuffer.remaining(); + final InetAddress sender = socketChannel.socket().getInetAddress(); + + // go through the buffer looking for the end of each message + currBytes.reset(); + for (int i = 0; i < total; i++) { + // NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved + // Pull data out of buffer and cram into byte array + byte currByte = socketBuffer.get(); + currBytes.write(currByte); + + // check if at end of a message + if (currByte == getDelimiter()) { + final SocketChannelResponder response = new SocketChannelResponder(socketChannel); + final Map<String,String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + + // queue the raw event blocking until space is available, reset the buffer + final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); + events.put(event); + currBytes.reset(); + + // Mark this as the start of the next message + socketBuffer.mark(); + } + } + } + + @Override + public byte getDelimiter() { + return TCP_DELIMITER; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java new file mode 100644 index 0000000..978f3ac --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.util.List; + +/** + * A responder for a given channel. + * + * @param <C> The type of SelectableChannel where the response will be written. + */ +public interface ChannelResponder<C extends SelectableChannel> { + + /** + * @return a SelectableChannel to write the response to + */ + C getChannel(); + + /** + * @return a list of responses to write to the channel + */ + List<ChannelResponse> getResponses(); + + /** + * @param response adds the given response to the list of responses + */ + void addResponse(ChannelResponse response); + + /** + * Writes the responses to the underlying channel. + */ + void respond() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java new file mode 100644 index 0000000..98f0301 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response; + +/** + * A response to send back over channel. + */ +public interface ChannelResponse { + + /** + * @return the bytes that should be written to a channel for this response + */ + byte[] toByteArray(); + +}
