http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
new file mode 100644
index 0000000..20102ba
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response.socket;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A ChannelResponder for SSLSocketChannels.
+ */
+public class SSLSocketChannelResponder extends SocketChannelResponder {
+
+    private SSLSocketChannel sslSocketChannel;
+
+    public SSLSocketChannelResponder(final SocketChannel socketChannel, final 
SSLSocketChannel sslSocketChannel) {
+        super(socketChannel);
+        this.sslSocketChannel = sslSocketChannel;
+    }
+
+    @Override
+    public void respond() throws IOException {
+        for (final ChannelResponse response : responses) {
+            sslSocketChannel.write(response.toByteArray());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
new file mode 100644
index 0000000..5c20bf0
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response.socket;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A ChannelResponder for SocketChannels. The SocketChannel should first be 
registered with a selector,
+ * upon being selected for writing the respond() method should be executed.
+ */
+public class SocketChannelResponder implements ChannelResponder<SocketChannel> 
{
+
+    protected final List<ChannelResponse> responses;
+    protected final SocketChannel socketChannel;
+
+    public SocketChannelResponder(final SocketChannel socketChannel) {
+        this.responses = new ArrayList<>();
+        this.socketChannel = socketChannel;
+    }
+
+    @Override
+    public SocketChannel getChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public List<ChannelResponse> getResponses() {
+        return Collections.unmodifiableList(responses);
+    }
+
+    @Override
+    public void addResponse(ChannelResponse response) {
+        this.responses.add(response);
+    }
+
+    @Override
+    public void respond() throws IOException {
+        for (final ChannelResponse response : responses) {
+            final ByteBuffer responseBuffer = 
ByteBuffer.wrap(response.toByteArray());
+
+            while (responseBuffer.hasRemaining()) {
+                socketChannel.write(responseBuffer);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
index 70d3f22..e51ba6c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.util.StandardValidators;
 
@@ -32,7 +31,7 @@ public abstract class AbstractSyslogProcessor extends 
AbstractProcessor {
 
     public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
             .Builder().name("Protocol")
-            .description("The protocol for Syslog communication, either TCP or 
UDP.")
+            .description("The protocol for Syslog communication.")
             .required(true)
             .allowableValues(TCP_VALUE, UDP_VALUE)
             .defaultValue(UDP_VALUE.getValue())
@@ -45,39 +44,11 @@ public abstract class AbstractSyslogProcessor extends 
AbstractProcessor {
             .build();
     public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
             .name("Character Set")
-            .description("Specifies which character set of the Syslog 
messages")
+            .description("Specifies the character set of the Syslog messages")
             .required(true)
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
 
-    /**
-     * FlowFile Attributes for each Syslog message.
-     */
-    public enum SyslogAttributes implements FlowFileAttributeKey {
-        PRIORITY("syslog.priority"),
-        SEVERITY("syslog.severity"),
-        FACILITY("syslog.facility"),
-        VERSION("syslog.version"),
-        TIMESTAMP("syslog.timestamp"),
-        HOSTNAME("syslog.hostname"),
-        SENDER("syslog.sender"),
-        BODY("syslog.body"),
-        VALID("syslog.valid"),
-        PROTOCOL("syslog.protocol"),
-        PORT("syslog.port");
-
-        private String key;
-
-        SyslogAttributes(String key) {
-            this.key = key;
-        }
-
-        @Override
-        public String key() {
-            return key;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
index a4317dc..ecc05ee 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
@@ -125,7 +125,7 @@ public class HandleHttpResponse extends AbstractProcessor {
         final String statusCodeValue = 
context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
         if (!isNumber(statusCodeValue)) {
             session.transfer(flowFile, REL_FAILURE);
-            getLogger().error("Failed to response to HTTP request for {} 
because status code was '{}', which is not a valid number", new 
Object[]{flowFile, statusCodeValue});
+            getLogger().error("Failed to respond to HTTP request for {} 
because status code was '{}', which is not a valid number", new 
Object[]{flowFile, statusCodeValue});
         }
 
         final HttpContextMap contextMap = 
context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
new file mode 100644
index 0000000..99e1830
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPEvent;
+import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
+import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
+import 
org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"listen", "relp", "tcp", "logs"})
+@CapabilityDescription("Listens for RELP messages being sent to a given port 
over TCP. Each message will be " +
+        "acknowledged after successfully writing the message to a FlowFile. 
Each FlowFile will contain data " +
+        "portion of one or more RELP frames. In the case where the RELP frames 
contain syslog messages, the " +
+        "output of this processor can be sent to a ParseSyslog processor for 
further processing.")
+@WritesAttributes({
+        @WritesAttribute(attribute="relp.command", description="The command of 
the RELP frames."),
+        @WritesAttribute(attribute="relp.sender", description="The sending 
host of the messages."),
+        @WritesAttribute(attribute="relp.port", description="The sending port 
the messages were received over."),
+        @WritesAttribute(attribute="relp.txnr", description="The transaction 
number of the message. Only included if <Batch Size> is 1."),
+        @WritesAttribute(attribute="mime.type", description="The mime.type of 
the content which is text/plain")
+    })
+@SeeAlso({ParseSyslog.class})
+public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> {
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The Controller Service to use in order to obtain an 
SSL Context. If this property is set, " +
+                    "messages will be received over a secure connection.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    private volatile RELPEncoder relpEncoder;
+    private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
+
+    @Override
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, 
MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE);
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        super.onScheduled(context);
+        // wanted to ensure charset was already populated here
+        relpEncoder = new RELPEncoder(charset);
+
+        final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+    }
+
+    @Override
+    protected ChannelDispatcher createDispatcher(final ProcessContext context, 
final BlockingQueue<RELPEvent> events) throws IOException {
+        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
+        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> 
handlerFactory = new RELPSocketChannelHandlerFactory<>();
+
+        final int maxConnections = 
context.getProperty(MAX_CONNECTIONS).asInteger();
+        final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
+
+        // initialize the buffer pool based on max number of connections and 
the buffer size
+        final LinkedBlockingQueue<ByteBuffer> bufferPool = new 
LinkedBlockingQueue<>(maxConnections);
+        for (int i = 0; i < maxConnections; i++) {
+            bufferPool.offer(ByteBuffer.allocate(bufferSize));
+        }
+
+        // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
+        SSLContext sslContext = null;
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+        }
+
+        // if we decide to support SSL then get the context and pass it in here
+        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
bufferPool, events,
+                getLogger(), maxConnections, sslContext, charSet);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final Map<String,FlowFileEventBatch> batches = getBatches(session, 
maxBatchSize, messageDemarcatorBytes);
+
+        // if the size is 0 then there was nothing to process so yield and 
return
+        if (batches.size() == 0) {
+            context.yield();
+            return;
+        }
+
+        for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPEvent> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", new Object[] {entry.getKey()});
+                continue;
+            }
+
+            // the sender and command will be the same for all events based on 
the batch key
+            final String sender = events.get(0).getSender();
+            final String command = events.get(0).getCommand();
+
+            final int numAttributes = events.size() == 1 ? 5 : 4;
+
+            final Map<String,String> attributes = new HashMap<>(numAttributes);
+            attributes.put(RELPAttributes.COMMAND.key(), command);
+            attributes.put(RELPAttributes.SENDER.key(), sender);
+            attributes.put(RELPAttributes.PORT.key(), String.valueOf(port));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+            // if there was only one event then we can pass on the transaction
+            // NOTE: we could pass on all the transaction ids joined together
+            if (events.size() == 1) {
+                attributes.put(RELPAttributes.TXNR.key(), 
String.valueOf(events.get(0).getTxnr()));
+            }
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
+            session.transfer(flowFile, REL_SUCCESS);
+
+            // create a provenance receive event
+            final String senderHost = sender.startsWith("/") && 
sender.length() > 1 ? sender.substring(1) : sender;
+            final String transitUri = new 
StringBuilder().append("relp").append("://").append(senderHost).append(":")
+                    .append(port).toString();
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+            // commit the session to guarantee the data has been delivered
+            session.commit();
+
+            // respond to each event to acknowledge successful receipt
+            for (final RELPEvent event : events) {
+                respond(event, RELPResponse.ok(event.getTxnr()));
+            }
+        }
+    }
+
+    @Override
+    protected String getBatchKey(RELPEvent event) {
+        return event.getSender() + "_" + event.getCommand();
+    }
+
+    protected void respond(final RELPEvent event, final RELPResponse 
relpResponse) {
+        final ChannelResponse response = new RELPChannelResponse(relpEncoder, 
relpResponse);
+
+        final ChannelResponder responder = event.getResponder();
+        responder.addResponse(response);
+        try {
+            responder.respond();
+        } catch (IOException e) {
+            getLogger().error("Error sending response for transaction {} due 
to {}",
+                    new Object[] {event.getTxnr(), e.getMessage()}, e);
+        }
+    }
+
+    public enum RELPAttributes implements FlowFileAttributeKey {
+        TXNR("relp.txnr"),
+        COMMAND("relp.command"),
+        SENDER("relp.sender"),
+        PORT("relp.port");
+
+        private final String key;
+
+        RELPAttributes(String key) {
+            this.key = key;
+        }
+
+        @Override
+        public String key() {
+            return key;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index e1e0e91..2e439b4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -31,7 +30,6 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -40,26 +38,25 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import 
org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
+import org.apache.nifi.processors.standard.syslog.SyslogEvent;
+import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SelectableChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -67,16 +64,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -175,7 +168,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
 
-    private volatile ChannelReader channelReader;
+    private volatile ChannelDispatcher channelDispatcher;
     private volatile SyslogParser parser;
     private volatile BlockingQueue<ByteBuffer> bufferPool;
     private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new 
LinkedBlockingQueue<>(10);
@@ -255,11 +248,10 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final String charSet = context.getProperty(CHARSET).getValue();
         final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
-        final String charsetName = context.getProperty(CHARSET).getValue();
-        messageDemarcatorBytes = 
msgDemarcator.getBytes(Charset.forName(charsetName));
+        messageDemarcatorBytes = 
msgDemarcator.getBytes(Charset.forName(charSet));
 
         final int maxConnections;
-        if (protocol.equals(UDP_VALUE.getValue())) {
+        if (UDP_VALUE.getValue().equals(protocol)) {
             maxConnections = 1;
         } else {
             maxConnections = 
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
@@ -274,10 +266,10 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        channelReader = createChannelReader(protocol, bufferPool, 
syslogEvents, maxConnections, sslContextService);
-        channelReader.open(port, maxChannelBufferSize);
+        channelDispatcher = createChannelReader(protocol, bufferPool, 
syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
+        channelDispatcher.open(port, maxChannelBufferSize);
 
-        final Thread readerThread = new Thread(channelReader);
+        final Thread readerThread = new Thread(channelDispatcher);
         readerThread.setName("ListenSyslog [" + getIdentifier() + "]");
         readerThread.setDaemon(true);
         readerThread.start();
@@ -288,12 +280,15 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         return parser;
     }
 
-    // visible for testing to be overridden and provide a mock ChannelReader 
if desired
-    protected ChannelReader createChannelReader(final String protocol, final 
BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> 
syslogEvents,
-        int maxConnections, final SSLContextService sslContextService)
-            throws IOException {
-        if (protocol.equals(UDP_VALUE.getValue())) {
-            return new DatagramChannelReader(bufferPool, syslogEvents, 
getLogger());
+    // visible for testing to be overridden and provide a mock 
ChannelDispatcher if desired
+    protected ChannelDispatcher createChannelReader(final String protocol, 
final BlockingQueue<ByteBuffer> bufferPool,
+                                                    final 
BlockingQueue<RawSyslogEvent> events, final int maxConnections,
+                                                    final SSLContextService 
sslContextService, final Charset charset) throws IOException {
+
+        final EventFactory<RawSyslogEvent> eventFactory = new 
RawSyslogEventFactory();
+
+        if (UDP_VALUE.getValue().equals(protocol)) {
+            return new DatagramChannelDispatcher(eventFactory, bufferPool, 
events, getLogger());
         } else {
             // if an SSLContextService was provided then create an SSLContext 
to pass down to the dispatcher
             SSLContext sslContext = null;
@@ -301,20 +296,21 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
                 sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
             }
 
-            return new SocketChannelDispatcher(bufferPool, syslogEvents, 
getLogger(), maxConnections, sslContext);
+            final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
+            return new SocketChannelDispatcher(eventFactory, handlerFactory, 
bufferPool, events, getLogger(), maxConnections, sslContext, charset);
         }
     }
 
     // used for testing to access the random port that was selected
     protected int getPort() {
-        return channelReader == null ? 0 : channelReader.getPort();
+        return channelDispatcher == null ? 0 : channelDispatcher.getPort();
     }
 
     @OnUnscheduled
     public void onUnscheduled() {
-        if (channelReader != null) {
-            channelReader.stop();
-            channelReader.close();
+        if (channelDispatcher != null) {
+            channelDispatcher.stop();
+            channelDispatcher.close();
         }
     }
 
@@ -394,7 +390,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             if (shouldParse) {
                 boolean valid = true;
                 try {
-                    event = parser.parseEvent(rawSyslogEvent.getRawMessage(), 
sender);
+                    event = parser.parseEvent(rawSyslogEvent.getData(), 
sender);
                 } catch (final ProcessException pe) {
                     getLogger().warn("Failed to parse Syslog event; routing to 
invalid");
                     valid = false;
@@ -411,7 +407,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
                     }
 
                     try {
-                        final byte[] rawBytes = rawSyslogEvent.getRawMessage();
+                        final byte[] rawBytes = rawSyslogEvent.getData();
                         invalidFlowFile = session.write(invalidFlowFile, new 
OutputStreamCallback() {
                             @Override
                             public void process(final OutputStream out) throws 
IOException {
@@ -449,7 +445,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
             try {
                 // write the raw bytes of the message as the FlowFile content
-                final byte[] rawMessage = (event == null) ? 
rawSyslogEvent.getRawMessage() : event.getRawMessage();
+                final byte[] rawMessage = (event == null) ? 
rawSyslogEvent.getData() : event.getRawMessage();
                 flowFile = session.append(flowFile, new OutputStreamCallback() 
{
                     @Override
                     public void process(final OutputStream out) throws 
IOException {
@@ -495,505 +491,47 @@ public class ListenSyslog extends 
AbstractSyslogProcessor {
     }
 
     /**
-     * Reads messages from a channel until told to stop.
-     */
-    private interface ChannelReader extends Runnable {
-
-        void open(int port, int maxBufferSize) throws IOException;
-
-        int getPort();
-
-        void stop();
-
-        void close();
-    }
-
-    /**
-     * Reads from the Datagram channel into an available buffer. If data is 
read then the buffer is queued for
-     * processing, otherwise the buffer is returned to the buffer pool.
-     */
-    private static class DatagramChannelReader implements ChannelReader {
-
-        private final BlockingQueue<ByteBuffer> bufferPool;
-        private final BlockingQueue<RawSyslogEvent> syslogEvents;
-        private final ProcessorLog logger;
-        private DatagramChannel datagramChannel;
-        private volatile boolean stopped = false;
-        private Selector selector;
-
-        public DatagramChannelReader(final BlockingQueue<ByteBuffer> 
bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final 
ProcessorLog logger) {
-            this.bufferPool = bufferPool;
-            this.syslogEvents = syslogEvents;
-            this.logger = logger;
-        }
-
-        @Override
-        public void open(final int port, int maxBufferSize) throws IOException 
{
-            datagramChannel = DatagramChannel.open();
-            datagramChannel.configureBlocking(false);
-            if (maxBufferSize > 0) {
-                datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
-                final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-                if (actualReceiveBufSize < maxBufferSize) {
-                    logMaxBufferWarning(logger, maxBufferSize, 
actualReceiveBufSize);
-                }
-            }
-            datagramChannel.socket().bind(new InetSocketAddress(port));
-            selector = Selector.open();
-            datagramChannel.register(selector, SelectionKey.OP_READ);
-        }
-
-        @Override
-        public void run() {
-            final ByteBuffer buffer = bufferPool.poll();
-            while (!stopped) {
-                try {
-                    int selected = selector.select();
-                    if (selected > 0){
-                        Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                        while (selectorKeys.hasNext()) {
-                            SelectionKey key = selectorKeys.next();
-                            selectorKeys.remove();
-                            if (!key.isValid()) {
-                                continue;
-                            }
-                            DatagramChannel channel = (DatagramChannel) 
key.channel();
-                            SocketAddress socketAddress;
-                            buffer.clear();
-                            while (!stopped && (socketAddress = 
channel.receive(buffer)) != null) {
-                                String sender = "";
-                                if (socketAddress instanceof 
InetSocketAddress) {
-                                    sender = ((InetSocketAddress) 
socketAddress).getAddress().toString();
-                                }
-
-                                // create a byte array from the buffer
-                                buffer.flip();
-                                byte bytes[] = new byte[buffer.limit()];
-                                buffer.get(bytes, 0, buffer.limit());
-
-                                // queue the raw message with the sender, 
block until space is available
-                                syslogEvents.put(new RawSyslogEvent(bytes, 
sender));
-                                buffer.clear();
-                            }
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    stopped = true;
-                } catch (IOException e) {
-                    logger.error("Error reading from DatagramChannel", e);
-                }
-            }
-
-            if (buffer != null) {
-                try {
-                    bufferPool.put(buffer);
-                } catch (InterruptedException e) {
-                    // nothing to do here
-                }
-            }
-        }
-
-        @Override
-        public int getPort() {
-            return datagramChannel == null ? 0 : 
datagramChannel.socket().getLocalPort();
-        }
-
-        @Override
-        public void stop() {
-            selector.wakeup();
-            stopped = true;
-        }
-
-        @Override
-        public void close() {
-            IOUtils.closeQuietly(selector);
-            IOUtils.closeQuietly(datagramChannel);
-        }
-    }
-
-    /**
-     * Accepts Socket connections on the given port and creates a handler for 
each connection to
-     * be executed by a thread pool.
+     * Wrapper class to pass around the raw message and the host/ip that sent 
it
      */
-    private static class SocketChannelDispatcher implements ChannelReader {
-
-        private final BlockingQueue<ByteBuffer> bufferPool;
-        private final BlockingQueue<RawSyslogEvent> syslogEvents;
-        private final ProcessorLog logger;
-        private final ExecutorService executor;
-        private volatile boolean stopped = false;
-        private Selector selector;
-        private final BlockingQueue<SelectionKey> keyQueue;
-        private final int maxConnections;
-        private final AtomicInteger currentConnections = new AtomicInteger(0);
-        private final SSLContext sslContext;
-
-        public SocketChannelDispatcher(final BlockingQueue<ByteBuffer> 
bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents,
-                                       final ProcessorLog logger, final int 
maxConnections, final SSLContext sslContext) {
-            this.bufferPool = bufferPool;
-            this.syslogEvents = syslogEvents;
-            this.logger = logger;
-            this.maxConnections = maxConnections;
-            this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
-            this.sslContext = sslContext;
-            this.executor = Executors.newFixedThreadPool(maxConnections);
-        }
-
-        @Override
-        public void open(final int port, int maxBufferSize) throws IOException 
{
-            final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-            serverSocketChannel.configureBlocking(false);
-            if (maxBufferSize > 0) {
-                serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
-                final int actualReceiveBufSize = 
serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-                if (actualReceiveBufSize < maxBufferSize) {
-                    logMaxBufferWarning(logger, maxBufferSize, 
actualReceiveBufSize);
-                }
-            }
-            serverSocketChannel.socket().bind(new InetSocketAddress(port));
-            selector = Selector.open();
-            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-        }
+    static class RawSyslogEvent<C extends SelectableChannel> implements 
Event<C> {
 
-        @Override
-        public void run() {
-            while (!stopped) {
-                try {
-                    int selected = selector.select();
-                    if (selected > 0){
-                        Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                        while (selectorKeys.hasNext()){
-                            SelectionKey key = selectorKeys.next();
-                            selectorKeys.remove();
-                            if (!key.isValid()){
-                                continue;
-                            }
-                            if (key.isAcceptable()) {
-                                // Handle new connections coming in
-                                final ServerSocketChannel channel = 
(ServerSocketChannel) key.channel();
-                                final SocketChannel socketChannel = 
channel.accept();
-                                // Check for available connections
-                                if (currentConnections.incrementAndGet() > 
maxConnections){
-                                    currentConnections.decrementAndGet();
-                                    logger.warn("Rejecting connection from {} 
because max connections has been met",
-                                            new Object[]{ 
socketChannel.getRemoteAddress().toString() });
-                                    IOUtils.closeQuietly(socketChannel);
-                                    continue;
-                                }
-                                logger.debug("Accepted incoming connection 
from {}",
-                                        new 
Object[]{socketChannel.getRemoteAddress().toString()});
-                                // Set socket to non-blocking, and register 
with selector
-                                socketChannel.configureBlocking(false);
-                                SelectionKey readKey = 
socketChannel.register(selector, SelectionKey.OP_READ);
-
-                                // Prepare the byte buffer for the reads, 
clear it out
-                                ByteBuffer buffer = bufferPool.poll();
-                                buffer.clear();
-                                buffer.mark();
-
-                                // If we have an SSLContext then create an 
SSLEngine for the channel
-                                SSLEngine sslEngine = null;
-                                if (sslContext != null) {
-                                    sslEngine = sslContext.createSSLEngine();
-                                }
-
-                                // Attach the buffer and SSLEngine to the key
-                                SocketChannelAttachment attachment = new 
SocketChannelAttachment(buffer, sslEngine);
-                                readKey.attach(attachment);
-                            } else if (key.isReadable()) {
-                                // Clear out the operations the select is 
interested in until done reading
-                                key.interestOps(0);
-                                // Create a handler based on whether an 
SSLEngine was provided or not
-                                final Runnable handler;
-                                if (sslContext != null) {
-                                    handler = new SSLSocketChannelHandler(key, 
this, syslogEvents, logger);
-                                } else {
-                                    handler = new SocketChannelHandler(key, 
this, syslogEvents, logger);
-                                }
-                                // run the handler
-                                executor.execute(handler);
-                            }
-                        }
-                    }
-                    // Add back all idle sockets to the select
-                    SelectionKey key;
-                    while((key = keyQueue.poll()) != null){
-                        key.interestOps(SelectionKey.OP_READ);
-                    }
-                } catch (IOException e) {
-                    logger.error("Error accepting connection from 
SocketChannel", e);
-                }
-            }
-        }
+        final byte[] rawMessage;
+        final String sender;
 
-        @Override
-        public int getPort() {
-            // Return the port for the key listening for accepts
-            for(SelectionKey key : selector.keys()){
-                if (key.isValid()) {
-                    final Channel channel = key.channel();
-                    if (channel instanceof  ServerSocketChannel) {
-                        return 
((ServerSocketChannel)channel).socket().getLocalPort();
-                    }
-                }
-            }
-            return 0;
+        public RawSyslogEvent(final byte[] rawMessage, final String sender) {
+            this.rawMessage = rawMessage;
+            this.sender = sender;
         }
 
         @Override
-        public void stop() {
-            stopped = true;
-            selector.wakeup();
+        public byte[] getData() {
+            return this.rawMessage;
         }
 
         @Override
-        public void close() {
-            executor.shutdown();
-            try {
-                // Wait a while for existing tasks to terminate
-                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
-                    executor.shutdownNow();
-                }
-            } catch (InterruptedException ie) {
-                // (Re-)Cancel if current thread also interrupted
-                executor.shutdownNow();
-                // Preserve interrupt status
-                Thread.currentThread().interrupt();
-            }
-            for(SelectionKey key : selector.keys()){
-                IOUtils.closeQuietly(key.channel());
-            }
-            IOUtils.closeQuietly(selector);
-        }
-
-        public void completeConnection(SelectionKey key) {
-            // connection is done. Return the buffer to the pool
-            try {
-                SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
-                bufferPool.put(attachment.getByteBuffer());
-            } catch (InterruptedException e) {
-                // nothing to do here
-            }
-            currentConnections.decrementAndGet();
-        }
-
-        public void addBackForSelection(SelectionKey key) {
-            keyQueue.offer(key);
-            selector.wakeup();
-        }
-
-    }
-
-    /**
-     * Reads from the given SocketChannel into the provided buffer. If data is 
read then the buffer is queued for
-     * processing, otherwise the buffer is returned to the buffer pool.
-     */
-    private static class SocketChannelHandler implements Runnable {
-
-        private final SelectionKey key;
-        private final SocketChannelDispatcher dispatcher;
-        private final BlockingQueue<RawSyslogEvent> syslogEvents;
-        private final ProcessorLog logger;
-        private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
-
-        public SocketChannelHandler(final SelectionKey key, final 
SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> 
syslogEvents, final ProcessorLog logger) {
-            this.key = key;
-            this.dispatcher = dispatcher;
-            this.syslogEvents = syslogEvents;
-            this.logger = logger;
+        public String getSender() {
+            return this.sender;
         }
 
         @Override
-        public void run() {
-            boolean eof = false;
-            SocketChannel socketChannel = null;
-
-            try {
-                int bytesRead;
-                socketChannel = (SocketChannel) key.channel();
-
-                SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
-                ByteBuffer socketBuffer = attachment.getByteBuffer();
-
-                // read until the buffer is full
-                while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
-                    // prepare byte buffer for reading
-                    socketBuffer.flip();
-                    // mark the current position as start, in case of partial 
message read
-                    socketBuffer.mark();
-
-                    // get total bytes in buffer
-                    int total = socketBuffer.remaining();
-                    // go through the buffer looking for the end of each 
message
-                    currBytes.reset();
-                    for (int i = 0; i < total; i++) {
-                        // NOTE: For higher throughput, the looking for \n and 
copying into the byte
-                        // stream could be improved
-                        // Pull data out of buffer and cram into byte array
-                        byte currByte = socketBuffer.get();
-                        currBytes.write(currByte);
-
-                        // check if at end of a message
-                        if (currByte == '\n') {
-                            String sender = 
socketChannel.socket().getInetAddress().toString();
-                            // queue the raw event blocking until space is 
available, reset the buffer
-                            syslogEvents.put(new 
RawSyslogEvent(currBytes.toByteArray(), sender));
-                            currBytes.reset();
-                            // Mark this as the start of the next message
-                            socketBuffer.mark();
-                        }
-                    }
-                    // Preserve bytes in buffer for next call to run
-                    // NOTE: This code could benefit from the  two ByteBuffer 
read calls to avoid
-                    //  this compact for higher throughput
-                    socketBuffer.reset();
-                    socketBuffer.compact();
-                    logger.debug("done handling SocketChannel");
-                }
-                // Check for closed socket
-                if( bytesRead < 0 ){
-                    eof = true;
-                }
-            } catch (ClosedByInterruptException | InterruptedException e) {
-                logger.debug("read loop interrupted, closing connection");
-                // Treat same as closed socket
-                eof = true;
-            } catch (IOException e) {
-                logger.error("Error reading from channel due to {}", new 
Object[] {e.getMessage()}, e);
-                // Treat same as closed socket
-                eof = true;
-            } finally {
-                if(eof == true) {
-                    IOUtils.closeQuietly(socketChannel);
-                    dispatcher.completeConnection(key);
-                } else {
-                    dispatcher.addBackForSelection(key);
-                }
-            }
+        public ChannelResponder getResponder() {
+            return null;
         }
     }
 
     /**
-     * Wraps a SocketChannel with an SSLSocketChannel for receiving messages 
over TLS.
+     * EventFactory implementation for RawSyslogEvent.
      */
-    private static class SSLSocketChannelHandler implements Runnable {
-
-        private final SelectionKey key;
-        private final SocketChannelDispatcher dispatcher;
-        private final BlockingQueue<RawSyslogEvent> syslogEvents;
-        private final ProcessorLog logger;
-        private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
-
-        public SSLSocketChannelHandler(final SelectionKey key, final 
SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> 
syslogEvents, final ProcessorLog logger) {
-            this.key = key;
-            this.dispatcher = dispatcher;
-            this.syslogEvents = syslogEvents;
-            this.logger = logger;
-        }
+    private static class RawSyslogEventFactory implements 
EventFactory<RawSyslogEvent> {
 
         @Override
-        public void run() {
-            boolean eof = false;
-            SSLSocketChannel sslSocketChannel = null;
-            try {
-                int bytesRead;
-                final SocketChannel socketChannel = (SocketChannel) 
key.channel();
-                final SocketChannelAttachment attachment = 
(SocketChannelAttachment) key.attachment();
-
-                // wrap the SocketChannel with an SSLSocketChannel using the 
SSLEngine from the attachment
-                sslSocketChannel = new 
SSLSocketChannel(attachment.getSslEngine(), socketChannel, false);
-
-                // SSLSocketChannel deals with byte[] so ByteBuffer isn't used 
here, but we'll use the size to create a new byte[]
-                final ByteBuffer socketBuffer = attachment.getByteBuffer();
-                byte[] socketBufferArray = new byte[socketBuffer.limit()];
-
-                // read until no more data
-                while ((bytesRead = sslSocketChannel.read(socketBufferArray)) 
> 0) {
-                    // go through the buffer looking for the end of each 
message
-                    for (int i = 0; i < bytesRead; i++) {
-                        final byte currByte = socketBufferArray[i];
-                        currBytes.write(currByte);
-
-                        // check if at end of a message
-                        if (currByte == '\n') {
-                            final String sender = 
socketChannel.socket().getInetAddress().toString();
-                            // queue the raw event blocking until space is 
available, reset the temporary buffer
-                            syslogEvents.put(new 
RawSyslogEvent(currBytes.toByteArray(), sender));
-                            currBytes.reset();
-                        }
-                    }
-                    logger.debug("done handling SocketChannel");
-                }
-
-                // Check for closed socket
-                if( bytesRead < 0 ){
-                    eof = true;
-                }
-            } catch (ClosedByInterruptException | InterruptedException e) {
-                logger.debug("read loop interrupted, closing connection");
-                // Treat same as closed socket
-                eof = true;
-            } catch (IOException e) {
-                logger.error("Error reading from channel due to {}", new 
Object[] {e.getMessage()}, e);
-                // Treat same as closed socket
-                eof = true;
-            } finally {
-                if(eof == true) {
-                    IOUtils.closeQuietly(sslSocketChannel);
-                    dispatcher.completeConnection(key);
-                } else {
-                    dispatcher.addBackForSelection(key);
-                }
+        public RawSyslogEvent create(byte[] data, Map<String, String> 
metadata, final ChannelResponder responder) {
+            String sender = null;
+            if (metadata != null && 
metadata.containsKey(EventFactory.SENDER_KEY)) {
+                sender = metadata.get(EventFactory.SENDER_KEY);
             }
+            return new RawSyslogEvent(data, sender);
         }
     }
 
-    static void logMaxBufferWarning(final ProcessorLog logger, int 
maxBufferSize, int actualReceiveBufSize) {
-        logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize 
+ " bytes but could only set to "
-                + actualReceiveBufSize + "bytes. You may want to consider 
changing the Operating System's "
-                + "maximum receive buffer");
-    }
-
-    // Wrapper class to pass around the raw message and the host/ip that sent 
it
-    static class RawSyslogEvent {
-
-        final byte[] rawMessage;
-        final String sender;
-
-        public RawSyslogEvent(byte[] rawMessage, String sender) {
-            this.rawMessage = rawMessage;
-            this.sender = sender;
-        }
-
-        public byte[] getRawMessage() {
-            return this.rawMessage;
-        }
-
-        public String getSender() {
-            return this.sender;
-        }
-
-    }
-
-    // Wrapper class so we can attach a buffer and/or an SSLEngine to the 
selector key
-    private static class SocketChannelAttachment {
-
-        private final ByteBuffer byteBuffer;
-        private final SSLEngine sslEngine;
-
-        public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine 
sslEngine) {
-            this.byteBuffer = byteBuffer;
-            this.sslEngine = sslEngine;
-        }
-
-        public ByteBuffer getByteBuffer() {
-            return byteBuffer;
-        }
-
-        public SSLEngine getSslEngine() {
-            return sslEngine;
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
index 1490cc2..90fa816 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java
@@ -46,9 +46,9 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import 
org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
+import org.apache.nifi.processors.standard.syslog.SyslogEvent;
+import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.apache.nifi.stream.io.StreamUtils;
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
index 2089956..e555a0c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
@@ -35,7 +35,7 @@ import 
org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.ObjectHolder;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
new file mode 100644
index 0000000..e877ea2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.event;
+
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.nio.channels.SocketChannel;
+
+/**
+ * A RELP event which adds the transaction number and command to the 
StandardEvent.
+ */
+public class RELPEvent extends StandardEvent<SocketChannel> {
+
+    private final long txnr;
+    private final String command;
+
+    public RELPEvent(final String sender, final byte[] data, final 
ChannelResponder<SocketChannel> responder, final long txnr, final String 
command) {
+        super(sender, data, responder);
+        this.txnr = txnr;
+        this.command = command;
+    }
+
+    public long getTxnr() {
+        return txnr;
+    }
+
+    public String getCommand() {
+        return command;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
new file mode 100644
index 0000000..22eba01
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.event;
+
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.util.Map;
+
+/**
+ * An EventFactory implementation to create RELPEvents.
+ */
+public class RELPEventFactory implements EventFactory<RELPEvent> {
+
+    @Override
+    public RELPEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
+        final long txnr = Long.valueOf(metadata.get(RELPMetadata.TXNR_KEY));
+        final String command = metadata.get(RELPMetadata.COMMAND_KEY);
+        final String sender = metadata.get(EventFactory.SENDER_KEY);
+        return new RELPEvent(sender, data, responder, txnr, command);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
new file mode 100644
index 0000000..88051c0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.event;
+
+/**
+ * Metadata keys for RELP.
+ */
+public interface RELPMetadata {
+
+    String TXNR_KEY = "relp.txnr";
+    String COMMAND_KEY = "relp.command";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
new file mode 100644
index 0000000..cc7fa28
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * Decodes a RELP frame by maintaining a state based on each byte that has 
been processed. This class
+ * should not be shared by multiple threads.
+ */
+public class RELPDecoder {
+
+    static final Logger logger = LoggerFactory.getLogger(RELPDecoder.class);
+
+    private RELPFrame.Builder frameBuilder;
+    private RELPState currState = RELPState.TXNR;
+
+    private final Charset charset;
+    private final ByteArrayOutputStream currBytes;
+
+    /**
+     * @param charset the charset to decode bytes from the RELP frame
+     */
+    public RELPDecoder(final Charset charset) {
+        this(charset, new ByteArrayOutputStream(4096));
+    }
+
+    /**
+     *
+     * @param charset the charset to decode bytes from the RELP frame
+     * @param buffer a buffer to use while processing the bytes
+     */
+    public RELPDecoder(final Charset charset, final ByteArrayOutputStream 
buffer) {
+        this.charset = charset;
+        this.currBytes = buffer;
+        this.frameBuilder = new RELPFrame.Builder();
+    }
+
+    /**
+     * Resets this decoder back to it's initial state.
+     */
+    public void reset() {
+        frameBuilder = new RELPFrame.Builder();
+        currState = RELPState.TXNR;
+        currBytes.reset();
+    }
+
+    /**
+     * Process the next byte from the channel, updating the builder and state 
accordingly.
+     *
+     * @param currByte the next byte to process
+     * @preturn true if a frame is ready to be retrieved, false otherwise
+     */
+    public boolean process(final byte currByte) throws RELPFrameException {
+        try {
+            switch (currState) {
+                case TXNR:
+                    processTXNR(currByte);
+                    break;
+                case COMMAND:
+                    processCOMMAND(currByte);
+                    break;
+                case LENGTH:
+                    processLENGTH(currByte);
+                    // if jumped from length to trailer we need to return true 
here
+                    // because there might not be another byte to process
+                    if (currState == RELPState.TRAILER) {
+                        return true;
+                    }
+                    break;
+                case DATA:
+                    processDATA(currByte);
+                    break;
+                case TRAILER:
+                    return true;
+                default:
+                    break;
+            }
+            return false;
+        } catch (Exception e) {
+            throw new RELPFrameException("Error decoding RELP frame: " + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Returns the decoded frame and resets the decoder for the next frame.
+     * This method should be called after checking isComplete().
+     *
+     * @return the RELPFrame that was decoded
+     */
+    public RELPFrame getFrame() throws RELPFrameException {
+        if (currState != RELPState.TRAILER) {
+            throw new RELPFrameException("Must be at the trailer of a frame");
+        }
+
+        try {
+            final RELPFrame frame = frameBuilder.build();
+            processTRAILER(RELPFrame.DELIMITER);
+            return frame;
+        } catch (Exception e) {
+            throw new RELPFrameException("Error decoding RELP frame: " + 
e.getMessage(), e);
+        }
+    }
+
+
+    private void processTXNR(final byte b) {
+        if (b == RELPFrame.SEPARATOR) {
+            if (currBytes.size() > 0) {
+                final long txnr = Long.parseLong(new 
String(currBytes.toByteArray(), charset));
+                frameBuilder.txnr(txnr);
+                logger.debug("Transaction number is {}", new Object[]{txnr});
+
+                currBytes.reset();
+                currState = RELPState.COMMAND;
+            }
+        } else {
+            currBytes.write(b);
+        }
+    }
+
+    private void processCOMMAND(final byte b) {
+        if (b == RELPFrame.SEPARATOR) {
+            final String command = new String(currBytes.toByteArray(), 
charset);
+            frameBuilder.command(command);
+            logger.debug("Command is {}", new Object[] {command});
+
+            currBytes.reset();
+            currState = RELPState.LENGTH;
+        } else {
+            currBytes.write(b);
+        }
+    }
+
+    private void processLENGTH(final byte b) {
+        if (b == RELPFrame.SEPARATOR || (currBytes.size() > 0 && b == 
RELPFrame.DELIMITER)) {
+            final int dataLength = Integer.parseInt(new 
String(currBytes.toByteArray(), charset));
+            frameBuilder.dataLength(dataLength);
+            logger.debug("Length is {}", new Object[] {dataLength});
+
+            currBytes.reset();
+
+            // if at a separator then data is going to follow, but if at a 
separator there is no data
+            if (b == RELPFrame.SEPARATOR) {
+                currState = RELPState.DATA;
+            } else {
+                frameBuilder.data(new byte[0]);
+                currState = RELPState.TRAILER;
+            }
+        } else {
+            currBytes.write(b);
+        }
+    }
+
+    private void processDATA(final byte b) {
+        currBytes.write(b);
+        logger.trace("Data size is {}", new Object[] {currBytes.size()});
+
+        if (currBytes.size() >= frameBuilder.dataLength) {
+            final byte[] data = currBytes.toByteArray();
+            frameBuilder.data(data);
+            logger.debug("Reached expected data size of {}", new Object[] 
{frameBuilder.dataLength});
+
+            currBytes.reset();
+            currState = RELPState.TRAILER;
+        }
+    }
+
+    private void processTRAILER(final byte b) {
+        if (b != RELPFrame.DELIMITER) {
+            logger.warn("Expected RELP trailing LF, but found another byte");
+        }
+        currBytes.reset();
+        frameBuilder.reset();
+        currState = RELPState.TXNR;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java
new file mode 100644
index 0000000..a36588a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
+/**
+ * Encodes a RELPFrame into raw bytes using the given charset.
+ */
+public class RELPEncoder {
+
+    private final Charset charset;
+
+    public RELPEncoder(final Charset charset) {
+        this.charset = charset;
+    }
+
+    public Charset getCharset() {
+        return charset;
+    }
+
+    public byte[] encode(final RELPFrame frame) {
+        final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+        // write transaction number followed by separator
+        byte[] txnr = String.format("%s", frame.getTxnr()).getBytes(charset);
+        buffer.write(txnr, 0, txnr.length);
+        buffer.write(RELPFrame.SEPARATOR);
+
+        // write the command followed by separator
+        byte[] command = frame.getCommand().getBytes(charset);
+        buffer.write(command, 0, command.length);
+        buffer.write(RELPFrame.SEPARATOR);
+
+        // write the data length
+        byte[] dataLength = String.format("%s", 
frame.getDataLength()).getBytes(charset);
+        buffer.write(dataLength, 0, dataLength.length);
+
+        // if data to write then put a separator and write the data
+        if (frame.getDataLength() > 0) {
+            buffer.write(RELPFrame.SEPARATOR);
+            buffer.write(frame.getData(), 0, frame.getDataLength());
+        }
+
+        // write the end of the frame
+        buffer.write(RELPFrame.DELIMITER);
+
+        return buffer.toByteArray();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java
new file mode 100644
index 0000000..d763dda
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * A RELP frame received from a channel.
+ */
+public class RELPFrame {
+
+    public static final byte DELIMITER = 10;
+    public static final byte SEPARATOR = 32;
+
+    private final long txnr;
+    private final int dataLength;
+    private final String command;
+    private final byte[] data;
+
+    private RELPFrame(final Builder builder) {
+        this.txnr = builder.txnr;
+        this.dataLength = builder.dataLength;
+        this.command = builder.command;
+        this.data = builder.data == null ? new byte[0] : builder.data;
+
+        if (txnr < 0 || dataLength < 0 || command == null || 
StringUtils.isBlank(command)
+                || data == null || dataLength != data.length) {
+            throw new RELPFrameException("Invalid Frame");
+        }
+    }
+
+    public long getTxnr() {
+        return txnr;
+    }
+
+    public int getDataLength() {
+        return dataLength;
+    }
+
+    public String getCommand() {
+        return command;
+    }
+
+    //NOTE: consider making a copy here if we want to be truly be immutable
+    public byte[] getData() {
+        return data;
+    }
+
+
+    /**
+     * Builder for a RELPFrame.
+     */
+    public static class Builder {
+
+        long txnr;
+        int dataLength;
+        String command;
+        byte[] data;
+
+        public Builder() {
+            reset();
+        }
+
+        public void reset() {
+            txnr = -1;
+            dataLength = -1;
+            command = null;
+            data = null;
+        }
+
+        public Builder txnr(final long txnr) {
+            this.txnr = txnr;
+            return this;
+        }
+
+        public Builder dataLength(final int dataLength) {
+            this.dataLength = dataLength;
+            return this;
+        }
+
+        public Builder command(final String command) {
+            this.command = command;
+            return this;
+        }
+
+        public Builder data(final byte[] data) {
+            this.data = data;
+            return this;
+        }
+
+        public RELPFrame build() {
+            return new RELPFrame(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java
new file mode 100644
index 0000000..dab01e5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+/**
+ * Represents an error encountered when decoding RELP frames.
+ */
+public class RELPFrameException extends RuntimeException {
+
+    public RELPFrameException(String message) {
+        super(message);
+    }
+
+    public RELPFrameException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java
new file mode 100644
index 0000000..aabecfb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+/**
+ * The parts of a RELP frame.
+ */
+public enum RELPState {
+
+    TXNR,
+    COMMAND,
+    LENGTH,
+    DATA,
+    TRAILER
+
+}

Reply via email to