http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
new file mode 100644
index 0000000..5af316e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Encapsulates the logic to handle a RELPFrame once it has been read from the 
channel.
+ */
+public class RELPFrameHandler<E extends Event<SocketChannel>> {
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    private final Charset charset;
+    private final EventFactory<E> eventFactory;
+    private final BlockingQueue<E> events;
+    private final SelectionKey key;
+    private final AsyncChannelDispatcher dispatcher;
+    private final RELPEncoder encoder;
+
+    public RELPFrameHandler(final SelectionKey selectionKey,
+                            final Charset charset,
+                            final EventFactory<E> eventFactory,
+                            final BlockingQueue<E> events,
+                            final AsyncChannelDispatcher dispatcher) {
+        this.key = selectionKey;
+        this.charset = charset;
+        this.eventFactory = eventFactory;
+        this.events = events;
+        this.dispatcher = dispatcher;
+        this.encoder = new RELPEncoder(charset);
+    }
+
+    public void handle(final RELPFrame frame, final 
ChannelResponder<SocketChannel> responder, final String sender)
+            throws IOException, InterruptedException {
+
+        // respond to open and close commands immediately, create and queue an 
event for everything else
+        if (CMD_OPEN.equals(frame.getCommand())) {
+            Map<String,String> offers = 
RELPResponse.parseOffers(frame.getData(), charset);
+            ChannelResponse response = new RELPChannelResponse(encoder, 
RELPResponse.open(frame.getTxnr(), offers));
+            responder.addResponse(response);
+            responder.respond();
+        } else if (CMD_CLOSE.equals(frame.getCommand())) {
+            ChannelResponse response = new RELPChannelResponse(encoder, 
RELPResponse.ok(frame.getTxnr()));
+            responder.addResponse(response);
+            responder.respond();
+            dispatcher.completeConnection(key);
+        } else {
+            final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
+            metadata.put(RELPMetadata.TXNR_KEY, 
String.valueOf(frame.getTxnr()));
+            metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand());
+
+            // queue the raw event blocking until space is available, reset 
the buffer
+            final E event = eventFactory.create(frame.getData(), metadata, 
responder);
+            events.put(event);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
new file mode 100644
index 0000000..8bbd116
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import 
org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
+import 
org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
+import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A RELP implementation of SSLSocketChannelHandler.
+ */
+public class RELPSSLSocketChannelHandler<E extends Event<SocketChannel>> 
extends SSLSocketChannelHandler<E> {
+
+    private RELPDecoder decoder;
+    private RELPFrameHandler<E> frameHandler;
+
+    public RELPSSLSocketChannelHandler(final SelectionKey key,
+                                       final AsyncChannelDispatcher dispatcher,
+                                       final Charset charset,
+                                       final EventFactory<E> eventFactory,
+                                       final BlockingQueue<E> events,
+                                       final ProcessorLog logger) {
+        super(key, dispatcher, charset, eventFactory, events, logger);
+        this.decoder = new RELPDecoder(charset);
+        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, 
events, dispatcher);
+    }
+
+    @Override
+    protected void processBuffer(final SSLSocketChannel sslSocketChannel, 
final SocketChannel socketChannel,
+                                 final int bytesRead, final byte[] buffer) 
throws InterruptedException, IOException {
+
+        final InetAddress sender = socketChannel.socket().getInetAddress();
+        try {
+            // go through the buffer parsing the RELP command
+            for (int i = 0; i < bytesRead; i++) {
+                byte currByte = buffer[i];
+
+                // if we found the end of a frame, handle the frame and mark 
the buffer
+                if (decoder.process(currByte)) {
+                    final RELPFrame frame = decoder.getFrame();
+
+                    logger.debug("Received RELP frame with transaction {} and 
command {}",
+                            new Object[] {frame.getTxnr(), 
frame.getCommand()});
+
+                    final SSLSocketChannelResponder responder = new 
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
+                    frameHandler.handle(frame, responder, sender.toString());
+                }
+            }
+
+            logger.debug("Done processing buffer");
+
+        } catch (final RELPFrameException rfe) {
+            logger.error("Error reading RELP frames due to {}", new Object[] 
{rfe.getMessage()} , rfe);
+            // if an invalid frame or bad data was sent then the decoder will 
be left in a
+            // corrupted state, so lets close the connection and cause the 
client to re-establish
+            dispatcher.completeConnection(key);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
new file mode 100644
index 0000000..0bb8185
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import 
org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
+import 
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
+import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Extends the StandardSocketChannelHandler to decode bytes into RELP frames.
+ */
+public class RELPSocketChannelHandler<E extends Event<SocketChannel>> extends 
StandardSocketChannelHandler<E> {
+
+    private RELPDecoder decoder;
+    private RELPFrameHandler<E> frameHandler;
+
+    public RELPSocketChannelHandler(final SelectionKey key,
+                                    final AsyncChannelDispatcher dispatcher,
+                                    final Charset charset,
+                                    final EventFactory<E> eventFactory,
+                                    final BlockingQueue<E> events,
+                                    final ProcessorLog logger) {
+        super(key, dispatcher, charset, eventFactory, events, logger);
+        this.decoder = new RELPDecoder(charset);
+        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, 
events, dispatcher);
+    }
+
+    @Override
+    protected void processBuffer(final SocketChannel socketChannel, final 
ByteBuffer socketBuffer)
+            throws InterruptedException, IOException {
+
+        // get total bytes in buffer
+        final int total = socketBuffer.remaining();
+        final InetAddress sender = socketChannel.socket().getInetAddress();
+
+        try {
+            // go through the buffer parsing the RELP command
+            for (int i = 0; i < total; i++) {
+                byte currByte = socketBuffer.get();
+
+                // if we found the end of a frame, handle the frame and mark 
the buffer
+                if (decoder.process(currByte)) {
+                    final RELPFrame frame = decoder.getFrame();
+
+                    logger.debug("Received RELP frame with transaction {} and 
command {}",
+                            new Object[] {frame.getTxnr(), 
frame.getCommand()});
+
+                    final SocketChannelResponder responder = new 
SocketChannelResponder(socketChannel);
+                    frameHandler.handle(frame, responder, sender.toString());
+                    socketBuffer.mark();
+                }
+            }
+
+            logger.debug("Done processing buffer");
+
+        } catch (final RELPFrameException rfe) {
+            logger.error("Error reading RELP frames due to {}", new Object[] 
{rfe.getMessage()}, rfe);
+            // if an invalid frame or bad data was sent then the decoder will 
be left in a
+            // corrupted state, so lets close the connection and cause the 
client to re-establish
+            dispatcher.completeConnection(key);
+        }
+    }
+
+    // not used for anything in RELP since the decoder encapsulates the 
delimiter
+    @Override
+    public byte getDelimiter() {
+        return RELPFrame.DELIMITER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
new file mode 100644
index 0000000..2299a6b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Default factory for creating RELP socket channel handlers.
+ */
+public class RELPSocketChannelHandlerFactory<E extends Event<SocketChannel>> 
implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
+
+    @Override
+    public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final 
SelectionKey key,
+                                           final AsyncChannelDispatcher 
dispatcher,
+                                           final Charset charset,
+                                           final EventFactory<E> eventFactory,
+                                           final BlockingQueue<E> events,
+                                           final ProcessorLog logger) {
+        return new RELPSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
+    }
+
+    @Override
+    public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final 
SelectionKey key,
+                                              final AsyncChannelDispatcher 
dispatcher,
+                                              final Charset charset,
+                                              final EventFactory<E> 
eventFactory,
+                                              final BlockingQueue<E> events,
+                                              final ProcessorLog logger) {
+        return new RELPSSLSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java
new file mode 100644
index 0000000..ace73cf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.response;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
+
+/**
+ * Creates a RELPFrame for the provided response and returns the encoded frame.
+ */
+public class RELPChannelResponse implements ChannelResponse {
+
+    private final RELPEncoder encoder;
+    private final RELPResponse response;
+
+    public RELPChannelResponse(final RELPEncoder encoder, final RELPResponse 
response) {
+        this.encoder = encoder;
+        this.response = response;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        final RELPFrame frame = response.toFrame(encoder.getCharset());
+        return encoder.encode(frame);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
new file mode 100644
index 0000000..f543f26
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.response;
+
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The data portion of a RELPFrame for a response:
+ *
+ * RSP-CODE [SP HUMANMSG] LF [CMDDATA]
+ *
+ */
+public class RELPResponse {
+
+    public static final int OK = 200;
+    public static final int ERROR = 500;
+
+    public static final String RSP_CMD = "rsp";
+
+    private final long txnr;
+    private final int code;
+    private final String message;
+    private final String data;
+
+    public RELPResponse(final long txnr, final int code) {
+        this(txnr, code, null, null);
+    }
+
+    public RELPResponse(final long txnr, final int code, final String message, 
final String data) {
+        this.txnr = txnr;
+        this.code = code;
+        this.message = message;
+        this.data = data;
+    }
+
+    /**
+     * Creates a RELPFrame where the data portion will contain this response.
+     *
+     * @param charset the character set to encode the response
+     *
+     * @return a RELPFrame for for this response
+     */
+    public RELPFrame toFrame(final Charset charset) {
+        final StringBuilder builder = new StringBuilder();
+        builder.append(code);
+
+        if (message != null && !message.isEmpty()) {
+            builder.append((char)RELPFrame.SEPARATOR);
+            builder.append(message);
+        }
+
+        if (data != null) {
+            builder.append((char)RELPFrame.DELIMITER);
+            builder.append(data);
+        }
+
+        final byte[] data = builder.toString().getBytes(charset);
+
+        return new RELPFrame.Builder()
+                .txnr(txnr).command(RSP_CMD)
+                .dataLength(data.length).data(data)
+                .build();
+    }
+
+    /**
+     * Utility method to create a response to an open request.
+     *
+     * @param txnr the transaction number of the open request
+     * @param offers the accepted offers
+     *
+     * @return the RELPResponse for the given open request
+     */
+    public static RELPResponse open(final long txnr, final Map<String,String> 
offers) {
+        int i = 0;
+        final StringBuilder sb = new StringBuilder();
+        for (final Map.Entry<String, String> entry : offers.entrySet()) {
+            if (i > 0) {
+                sb.append((char)RELPFrame.DELIMITER);
+            }
+
+            sb.append(entry.getKey());
+
+            if (entry.getValue() != null) {
+                sb.append('=');
+                sb.append(entry.getValue());
+            }
+            i++;
+        }
+
+        return new RELPResponse(txnr, OK, "OK", sb.toString());
+    }
+
+    /**
+     * Utility method to create a default "OK" response.
+     *
+     * @param txnr the transaction number being responded to
+     *
+     * @return a RELPResponse with a 200 code and a message of "OK"
+     */
+    public static RELPResponse ok(final long txnr) {
+        return new RELPResponse(txnr, OK, "OK", null);
+    }
+
+    /**
+     * Utility method to create a default "ERROR" response.
+     *
+     * @param txnr the transaction number being responded to
+     *
+     * @return a RELPResponse with a 500 code and a message of "ERROR"
+     */
+    public static RELPResponse error(final long txnr) {
+        return new RELPResponse(txnr, ERROR, "ERROR", null);
+    }
+
+
+    /**
+     * Parses the provided data into a Map of offers.
+     *
+     * @param data the data portion of a RELPFrame for an "open" command
+     * @param charset the charset to decode the data
+     *
+     * @return a Map of offers, or an empty Map if no data is provided
+     */
+    public static Map<String,String> parseOffers(final byte[] data, final 
Charset charset) {
+        final Map<String, String> offers = new HashMap<>();
+        if (data == null || data.length == 0) {
+            return offers;
+        }
+
+        final String dataStr = new String(data, charset);
+        final String[] splits = dataStr.split("[" + (char)RELPFrame.DELIMITER 
+ "]");
+
+        for (final String split : splits) {
+            final String[] fields = split.split( "=", 2);
+            if (fields.length > 1 ) {
+                offers.put(fields[0], fields[1]);
+            } else {
+                offers.put(fields[0], fields[0]);
+            }
+        }
+
+        return offers;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java
new file mode 100644
index 0000000..e7a13f1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.syslog;
+
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+
+/**
+ * FlowFile Attributes for each Syslog message.
+ */
+public enum SyslogAttributes implements FlowFileAttributeKey {
+
+    PRIORITY("syslog.priority"),
+    SEVERITY("syslog.severity"),
+    FACILITY("syslog.facility"),
+    VERSION("syslog.version"),
+    TIMESTAMP("syslog.timestamp"),
+    HOSTNAME("syslog.hostname"),
+    SENDER("syslog.sender"),
+    BODY("syslog.body"),
+    VALID("syslog.valid"),
+    PROTOCOL("syslog.protocol"),
+    PORT("syslog.port");
+
+    private String key;
+
+    SyslogAttributes(String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java
new file mode 100644
index 0000000..29243cd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.syslog;
+
+/**
+ * Encapsulates the parsed information for a single Syslog event.
+ */
+public class SyslogEvent {
+
+    private final String priority;
+    private final String severity;
+    private final String facility;
+    private final String version;
+    private final String timeStamp;
+    private final String hostName;
+    private final String sender;
+    private final String msgBody;
+    private final String fullMessage;
+    private final byte[] rawMessage;
+    private final boolean valid;
+
+    private SyslogEvent(final Builder builder) {
+        this.priority = builder.priority;
+        this.severity = builder.severity;
+        this.facility = builder.facility;
+        this.version = builder.version;
+        this.timeStamp = builder.timeStamp;
+        this.hostName = builder.hostName;
+        this.sender = builder.sender;
+        this.msgBody = builder.msgBody;
+        this.fullMessage = builder.fullMessage;
+        this.rawMessage = builder.rawMessage;
+        this.valid = builder.valid;
+    }
+
+    public String getPriority() {
+        return priority;
+    }
+
+    public String getSeverity() {
+        return severity;
+    }
+
+    public String getFacility() {
+        return facility;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public String getTimeStamp() {
+        return timeStamp;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public String getMsgBody() {
+        return msgBody;
+    }
+
+    public String getFullMessage() {
+        return fullMessage;
+    }
+
+    public byte[] getRawMessage() {
+        return rawMessage;
+    }
+
+    public boolean isValid() {
+        return valid;
+    }
+
+    public static final class Builder {
+        private String priority;
+        private String severity;
+        private String facility;
+        private String version;
+        private String timeStamp;
+        private String hostName;
+        private String sender;
+        private String msgBody;
+        private String fullMessage;
+        private byte[] rawMessage;
+        private boolean valid;
+
+        public void reset() {
+            this.priority = null;
+            this.severity = null;
+            this.facility = null;
+            this.version = null;
+            this.timeStamp = null;
+            this.hostName = null;
+            this.sender = null;
+            this.msgBody = null;
+            this.fullMessage = null;
+            this.valid = false;
+        }
+
+        public Builder priority(String priority) {
+            this.priority = priority;
+            return this;
+        }
+
+        public Builder severity(String severity) {
+            this.severity = severity;
+            return this;
+        }
+
+        public Builder facility(String facility) {
+            this.facility = facility;
+            return this;
+        }
+
+        public Builder version(String version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder timestamp(String timestamp) {
+            this.timeStamp = timestamp;
+            return this;
+        }
+
+        public Builder hostname(String hostName) {
+            this.hostName = hostName;
+            return this;
+        }
+
+        public Builder sender(String sender) {
+            this.sender = sender;
+            return this;
+        }
+
+        public Builder msgBody(String msgBody) {
+            this.msgBody = msgBody;
+            return this;
+        }
+
+        public Builder fullMessage(String fullMessage) {
+            this.fullMessage = fullMessage;
+            return this;
+        }
+
+        public Builder rawMessage(byte[] rawMessage) {
+            this.rawMessage = rawMessage;
+            return this;
+        }
+
+        public Builder valid(boolean valid) {
+            this.valid = valid;
+            return this;
+        }
+
+        public SyslogEvent build() {
+            return new SyslogEvent(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
new file mode 100644
index 0000000..b5dbf22
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.syslog;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.MatchResult;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance.
+ *
+ * The Syslog regular expressions below were adapted from the Apache Flume 
project.
+ */
+public class SyslogParser {
+
+    public static final String SYSLOG_MSG_RFC5424_0 =
+            "(?:\\<(\\d{1,3})\\>)" + // priority
+                    "(?:(\\d)?\\s?)" + // version
+      /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null 
stamp) */
+                    "(?:" +
+                    "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
+                    "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
+                    "\\s" + // separator
+                    "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
+                    "\\s" + // separator
+                    "(.*)$"; // body
+
+    public static final String SYSLOG_MSG_RFC3164_0 =
+            "(?:\\<(\\d{1,3})\\>)" +
+                    "(?:(\\d)?\\s?)" + // version
+                    // stamp MMM d HH:mm:ss, single digit date has two spaces
+                    
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
+                    "\\s" + // separator
+                    "([\\w][\\w\\d\\.@-]*)" + // host
+                    "\\s(.*)$";  // body
+
+    public static final Collection<Pattern> MESSAGE_PATTERNS;
+    static {
+        List<Pattern> patterns = new ArrayList<>();
+        patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0));
+        patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0));
+        MESSAGE_PATTERNS = Collections.unmodifiableList(patterns);
+    }
+
+    // capture group positions from the above message patterns
+    public static final int SYSLOG_PRIORITY_POS = 1;
+    public static final int SYSLOG_VERSION_POS = 2;
+    public static final int SYSLOG_TIMESTAMP_POS = 3;
+    public static final int SYSLOG_HOSTNAME_POS = 4;
+    public static final int SYSLOG_BODY_POS = 5;
+
+    private Charset charset;
+
+    public SyslogParser(final Charset charset) {
+        this.charset = charset;
+    }
+
+    /**
+     *  Parses a SyslogEvent from a byte buffer.
+     *
+     * @param buffer a byte buffer containing a syslog message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final ByteBuffer buffer) {
+        return parseEvent(buffer, null);
+    }
+
+    /**
+     *  Parses a SyslogEvent from a byte buffer.
+     *
+     * @param buffer a byte buffer containing a syslog message
+     * @param sender the hostname of the syslog server that sent the message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final ByteBuffer buffer, final String 
sender) {
+        if (buffer == null) {
+            return null;
+        }
+        if (buffer.position() != 0) {
+            buffer.flip();
+        }
+        byte bytes[] = new byte[buffer.limit()];
+        buffer.get(bytes, 0, buffer.limit());
+        return parseEvent(bytes, sender);
+    }
+
+    /**
+     * Parses a SyslogEvent from a byte array.
+     *
+     * @param bytes a byte array containing a syslog message
+     * @param sender the hostname of the syslog server that sent the message
+     * @return a SyslogEvent parsed from the byte array
+     */
+    public SyslogEvent parseEvent(final byte[] bytes, final String sender) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+
+        // remove trailing new line before parsing
+        int length = bytes.length;
+        if (bytes[length - 1] == '\n') {
+            length = length - 1;
+        }
+
+        final String message = new String(bytes, 0, length, charset);
+
+        final SyslogEvent.Builder builder = new SyslogEvent.Builder()
+                
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
+
+        for (Pattern pattern : MESSAGE_PATTERNS) {
+            final Matcher matcher = pattern.matcher(message);
+            if (!matcher.matches()) {
+                continue;
+            }
+
+            final MatchResult res = matcher.toMatchResult();
+            for (int grp = 1; grp <= res.groupCount(); grp++) {
+                String value = res.group(grp);
+                if (grp == SYSLOG_TIMESTAMP_POS) {
+                    builder.timestamp(value);
+                } else if (grp == SYSLOG_HOSTNAME_POS) {
+                    builder.hostname(value);
+                } else if (grp == SYSLOG_PRIORITY_POS) {
+                    int pri = Integer.parseInt(value);
+                    int sev = pri % 8;
+                    int facility = pri / 8;
+                    builder.priority(value);
+                    builder.severity(String.valueOf(sev));
+                    builder.facility(String.valueOf(facility));
+                } else if (grp == SYSLOG_VERSION_POS) {
+                    builder.version(value);
+                } else if (grp == SYSLOG_BODY_POS) {
+                    builder.msgBody(value);
+                }
+            }
+
+            builder.valid(true);
+            break;
+        }
+
+        // either invalid w/original msg, or fully parsed event
+        return builder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
deleted file mode 100644
index 3d06dbe..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-/**
- * Encapsulates the parsed information for a single Syslog event.
- */
-public class SyslogEvent {
-
-    private final String priority;
-    private final String severity;
-    private final String facility;
-    private final String version;
-    private final String timeStamp;
-    private final String hostName;
-    private final String sender;
-    private final String msgBody;
-    private final String fullMessage;
-    private final byte[] rawMessage;
-    private final boolean valid;
-
-    private SyslogEvent(final Builder builder) {
-        this.priority = builder.priority;
-        this.severity = builder.severity;
-        this.facility = builder.facility;
-        this.version = builder.version;
-        this.timeStamp = builder.timeStamp;
-        this.hostName = builder.hostName;
-        this.sender = builder.sender;
-        this.msgBody = builder.msgBody;
-        this.fullMessage = builder.fullMessage;
-        this.rawMessage = builder.rawMessage;
-        this.valid = builder.valid;
-    }
-
-    public String getPriority() {
-        return priority;
-    }
-
-    public String getSeverity() {
-        return severity;
-    }
-
-    public String getFacility() {
-        return facility;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public String getTimeStamp() {
-        return timeStamp;
-    }
-
-    public String getHostName() {
-        return hostName;
-    }
-
-    public String getSender() {
-        return sender;
-    }
-
-    public String getMsgBody() {
-        return msgBody;
-    }
-
-    public String getFullMessage() {
-        return fullMessage;
-    }
-
-    public byte[] getRawMessage() {
-        return rawMessage;
-    }
-
-    public boolean isValid() {
-        return valid;
-    }
-
-    public static final class Builder {
-        private String priority;
-        private String severity;
-        private String facility;
-        private String version;
-        private String timeStamp;
-        private String hostName;
-        private String sender;
-        private String msgBody;
-        private String fullMessage;
-        private byte[] rawMessage;
-        private boolean valid;
-
-        public void reset() {
-            this.priority = null;
-            this.severity = null;
-            this.facility = null;
-            this.version = null;
-            this.timeStamp = null;
-            this.hostName = null;
-            this.sender = null;
-            this.msgBody = null;
-            this.fullMessage = null;
-            this.valid = false;
-        }
-
-        public Builder priority(String priority) {
-            this.priority = priority;
-            return this;
-        }
-
-        public Builder severity(String severity) {
-            this.severity = severity;
-            return this;
-        }
-
-        public Builder facility(String facility) {
-            this.facility = facility;
-            return this;
-        }
-
-        public Builder version(String version) {
-            this.version = version;
-            return this;
-        }
-
-        public Builder timestamp(String timestamp) {
-            this.timeStamp = timestamp;
-            return this;
-        }
-
-        public Builder hostname(String hostName) {
-            this.hostName = hostName;
-            return this;
-        }
-
-        public Builder sender(String sender) {
-            this.sender = sender;
-            return this;
-        }
-
-        public Builder msgBody(String msgBody) {
-            this.msgBody = msgBody;
-            return this;
-        }
-
-        public Builder fullMessage(String fullMessage) {
-            this.fullMessage = fullMessage;
-            return this;
-        }
-
-        public Builder rawMessage(byte[] rawMessage) {
-            this.rawMessage = rawMessage;
-            return this;
-        }
-
-        public Builder valid(boolean valid) {
-            this.valid = valid;
-            return this;
-        }
-
-        public SyslogEvent build() {
-            return new SyslogEvent(this);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
deleted file mode 100644
index fd59e5b..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.regex.MatchResult;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance.
- *
- * The Syslog regular expressions below were adapted from the Apache Flume 
project.
- */
-public class SyslogParser {
-
-    public static final String SYSLOG_MSG_RFC5424_0 =
-            "(?:\\<(\\d{1,3})\\>)" + // priority
-                    "(?:(\\d)?\\s?)" + // version
-      /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null 
stamp) */
-                    "(?:" +
-                    "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" +
-                    "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp
-                    "\\s" + // separator
-                    "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null)
-                    "\\s" + // separator
-                    "(.*)$"; // body
-
-    public static final String SYSLOG_MSG_RFC3164_0 =
-            "(?:\\<(\\d{1,3})\\>)" +
-                    "(?:(\\d)?\\s?)" + // version
-                    // stamp MMM d HH:mm:ss, single digit date has two spaces
-                    
"([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" +
-                    "\\s" + // separator
-                    "([\\w][\\w\\d\\.@-]*)" + // host
-                    "\\s(.*)$";  // body
-
-    public static final Collection<Pattern> MESSAGE_PATTERNS;
-    static {
-        List<Pattern> patterns = new ArrayList<>();
-        patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0));
-        patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0));
-        MESSAGE_PATTERNS = Collections.unmodifiableList(patterns);
-    }
-
-    // capture group positions from the above message patterns
-    public static final int SYSLOG_PRIORITY_POS = 1;
-    public static final int SYSLOG_VERSION_POS = 2;
-    public static final int SYSLOG_TIMESTAMP_POS = 3;
-    public static final int SYSLOG_HOSTNAME_POS = 4;
-    public static final int SYSLOG_BODY_POS = 5;
-
-    private Charset charset;
-
-    public SyslogParser(final Charset charset) {
-        this.charset = charset;
-    }
-
-    /**
-     *  Parses a SyslogEvent from a byte buffer.
-     *
-     * @param buffer a byte buffer containing a syslog message
-     * @return a SyslogEvent parsed from the byte array
-     */
-    public SyslogEvent parseEvent(final ByteBuffer buffer) {
-        return parseEvent(buffer, null);
-    }
-
-    /**
-     *  Parses a SyslogEvent from a byte buffer.
-     *
-     * @param buffer a byte buffer containing a syslog message
-     * @param sender the hostname of the syslog server that sent the message
-     * @return a SyslogEvent parsed from the byte array
-     */
-    public SyslogEvent parseEvent(final ByteBuffer buffer, final String 
sender) {
-        if (buffer == null) {
-            return null;
-        }
-        if (buffer.position() != 0) {
-            buffer.flip();
-        }
-        byte bytes[] = new byte[buffer.limit()];
-        buffer.get(bytes, 0, buffer.limit());
-        return parseEvent(bytes, sender);
-    }
-
-    /**
-     * Parses a SyslogEvent from a byte array.
-     *
-     * @param bytes a byte array containing a syslog message
-     * @param sender the hostname of the syslog server that sent the message
-     * @return a SyslogEvent parsed from the byte array
-     */
-    public SyslogEvent parseEvent(final byte[] bytes, final String sender) {
-        if (bytes == null || bytes.length == 0) {
-            return null;
-        }
-
-        // remove trailing new line before parsing
-        int length = bytes.length;
-        if (bytes[length - 1] == '\n') {
-            length = length - 1;
-        }
-
-        final String message = new String(bytes, 0, length, charset);
-
-        final SyslogEvent.Builder builder = new SyslogEvent.Builder()
-                
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
-
-        for (Pattern pattern : MESSAGE_PATTERNS) {
-            final Matcher matcher = pattern.matcher(message);
-            if (!matcher.matches()) {
-                continue;
-            }
-
-            final MatchResult res = matcher.toMatchResult();
-            for (int grp = 1; grp <= res.groupCount(); grp++) {
-                String value = res.group(grp);
-                if (grp == SYSLOG_TIMESTAMP_POS) {
-                    builder.timestamp(value);
-                } else if (grp == SYSLOG_HOSTNAME_POS) {
-                    builder.hostname(value);
-                } else if (grp == SYSLOG_PRIORITY_POS) {
-                    int pri = Integer.parseInt(value);
-                    int sev = pri % 8;
-                    int facility = pri / 8;
-                    builder.priority(value);
-                    builder.severity(String.valueOf(sev));
-                    builder.facility(String.valueOf(facility));
-                } else if (grp == SYSLOG_VERSION_POS) {
-                    builder.version(value);
-                } else if (grp == SYSLOG_BODY_POS) {
-                    builder.msgBody(value);
-                }
-            }
-
-            builder.valid(true);
-            break;
-        }
-
-        // either invalid w/original msg, or fully parsed event
-        return builder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index befa5e7..66f1c27 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
 org.apache.nifi.processors.standard.ListFile
 org.apache.nifi.processors.standard.ListenHTTP
+org.apache.nifi.processors.standard.ListenRELP
 org.apache.nifi.processors.standard.ListenSyslog
 org.apache.nifi.processors.standard.ListenUDP
 org.apache.nifi.processors.standard.ListSFTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
new file mode 100644
index 0000000..877a55a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.relp.event.RELPEvent;
+import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestListenRELP {
+
+    public static final String OPEN_FRAME_DATA = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message 
here";
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+            .txnr(2)
+            .command("syslog")
+            .dataLength(SYSLOG_FRAME_DATA.length())
+            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    private RELPEncoder encoder;
+    private ResponseCapturingListenRELP proc;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        encoder = new RELPEncoder(StandardCharsets.UTF_8);
+        proc = new ResponseCapturingListenRELP();
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(ListenSyslog.PORT, "0");
+    }
+
+    @Test
+    public void testListenRELP() throws IOException, InterruptedException {
+        final List<RELPFrame> frames = new ArrayList<>();
+        frames.add(OPEN_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(CLOSE_FRAME);
+
+        // three syslog frames should be transferred and three responses 
should be sent
+        run(frames, 3, 3, null);
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(3, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+        Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
+
+        final List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
+        Assert.assertEquals(3, mockFlowFiles.size());
+
+        final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
+        Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), 
mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
+        Assert.assertEquals(SYSLOG_FRAME.getCommand(), 
mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
+        
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+        
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+    }
+
+    @Test
+    public void testBatching() throws IOException, InterruptedException {
+        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
+
+        final List<RELPFrame> frames = new ArrayList<>();
+        frames.add(OPEN_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(CLOSE_FRAME);
+
+        // one syslog frame should be transferred since we are batching, but 
three responses should be sent
+        run(frames, 1, 3, null);
+
+        final List<ProvenanceEventRecord> events = 
runner.getProvenanceEvents();
+        Assert.assertNotNull(events);
+        Assert.assertEquals(1, events.size());
+
+        final ProvenanceEventRecord event = events.get(0);
+        Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
+        Assert.assertTrue("transit uri must be set and start with proper 
protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
+
+        final List<MockFlowFile> mockFlowFiles = 
runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
+        Assert.assertEquals(1, mockFlowFiles.size());
+
+        final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
+        Assert.assertEquals(SYSLOG_FRAME.getCommand(), 
mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
+        
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
+        
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
+    }
+
+    @Test
+    public void testTLS() throws InitializationException, IOException, 
InterruptedException {
+        final SSLContextService sslContextService = new 
StandardSSLContextService();
+        runner.addControllerService("ssl-context", sslContextService);
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
+        runner.setProperty(sslContextService, 
StandardSSLContextService.KEYSTORE_TYPE, "JKS");
+        runner.enableControllerService(sslContextService);
+
+        runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context");
+
+        final List<RELPFrame> frames = new ArrayList<>();
+        frames.add(OPEN_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(SYSLOG_FRAME);
+        frames.add(CLOSE_FRAME);
+
+        // three syslog frames should be transferred and three responses 
should be sent
+        run(frames, 5, 5, sslContextService);
+    }
+
+    protected void run(final List<RELPFrame> frames, final int 
expectedTransferred, final int expectedResponses, final SSLContextService 
sslContextService)
+            throws IOException, InterruptedException {
+
+        Socket socket = null;
+        try {
+            // schedule to start listening on a random port
+            final ProcessSessionFactory processSessionFactory = 
runner.getProcessSessionFactory();
+            final ProcessContext context = runner.getProcessContext();
+            proc.onScheduled(context);
+
+            // create a client connection to the port the dispatcher is 
listening on
+            final int realPort = proc.getDispatcherPort();
+
+            // create either a regular socket or ssl socket based on context 
being passed in
+            if (sslContextService != null) {
+                final SSLContext sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+                socket = 
sslContext.getSocketFactory().createSocket("localhost", realPort);
+            } else {
+                socket = new Socket("localhost", realPort);
+            }
+            Thread.sleep(100);
+
+            // send the frames to the port the processors is listening on
+            sendFrames(frames, socket);
+
+            // call onTrigger until we processed all the frames, or a certain 
amount of time passes
+            long responseTimeout = 10000;
+            long startTime = System.currentTimeMillis();
+            while (proc.responses.size() < expectedTransferred
+                    && (System.currentTimeMillis() - startTime < 
responseTimeout)) {
+                proc.onTrigger(context, processSessionFactory);
+                Thread.sleep(100);
+            }
+
+            // should have gotten a response for each frame
+            Assert.assertEquals(expectedResponses, proc.responses.size());
+
+            // should have transferred the expected events
+            runner.assertTransferCount(ListenRELP.REL_SUCCESS, 
expectedTransferred);
+
+        } finally {
+            // unschedule to close connections
+            proc.onUnscheduled();
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+    private void sendFrames(final List<RELPFrame> frames, final Socket socket) 
throws IOException, InterruptedException {
+        // send the provided messages
+        for (final RELPFrame frame : frames) {
+            byte[] encodedFrame = encoder.encode(frame);
+            socket.getOutputStream().write(encodedFrame);
+            Thread.sleep(1);
+        }
+        socket.getOutputStream().flush();
+    }
+
+    // Extend ListenRELP so we can use the 
CapturingSocketChannelResponseDispatcher
+    private static class ResponseCapturingListenRELP extends ListenRELP {
+
+        private List<RELPResponse> responses = new ArrayList<>();
+
+        @Override
+        protected void respond(RELPEvent event, RELPResponse relpResponse) {
+            this.responses.add(relpResponse);
+            super.respond(event, relpResponse);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 5fb26c8..360dfe7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -16,19 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
@@ -36,9 +23,9 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.ListenSyslog.RawSyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
+import org.apache.nifi.processors.standard.syslog.SyslogEvent;
+import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.IntegerHolder;
@@ -50,6 +37,19 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
 public class TestListenSyslog {
 
     static final Logger LOGGER = 
LoggerFactory.getLogger(TestListenSyslog.class);
@@ -301,9 +301,9 @@ public class TestListenSyslog {
             runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1);
 
             final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0);
-            Assert.assertEquals("0", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
-            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
-            
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
+            Assert.assertEquals("0", 
flowFile.getAttribute(SyslogAttributes.PORT.key()));
+            Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), 
flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
+            
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
 
             final String content = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
             final String[] splits = content.split("\\|");
@@ -391,16 +391,16 @@ public class TestListenSyslog {
 
     @Test
     public void testErrorQueue() throws IOException {
-        final List<RawSyslogEvent> msgs = new ArrayList<>();
-        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
-        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01"));
+        final List<ListenSyslog.RawSyslogEvent> msgs = new ArrayList<>();
+        msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), 
"sender-01"));
+        msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), 
"sender-01"));
 
         // Add message that will throw a FlowFileAccessException the first 
time that we attempt to read
-        // the contents but will succeeed the second time.
+        // the contents but will succeed the second time.
         final IntegerHolder getMessageAttempts = new IntegerHolder(0);
-        msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") {
+        msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), 
"sender-01") {
             @Override
-            public byte[] getRawMessage() {
+            public byte[] getData() {
                 final int attempts = getMessageAttempts.incrementAndGet();
                 if (attempts == 1) {
                     throw new FlowFileAccessException("Unit test failure");
@@ -432,16 +432,16 @@ public class TestListenSyslog {
 
     private void checkFlowFile(final MockFlowFile flowFile, final int port, 
final String protocol) {
         flowFile.assertContentEquals(VALID_MESSAGE);
-        Assert.assertEquals(PRI, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key()));
-        Assert.assertEquals(SEV, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key()));
-        Assert.assertEquals(FAC, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key()));
-        Assert.assertEquals(TIME, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key()));
-        Assert.assertEquals(HOST, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key()));
-        Assert.assertEquals(BODY, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key()));
-        Assert.assertEquals("true", 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key()));
-        Assert.assertEquals(String.valueOf(port), 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key()));
-        Assert.assertEquals(protocol, 
flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key()));
-        
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key())));
+        Assert.assertEquals(PRI, 
flowFile.getAttribute(SyslogAttributes.PRIORITY.key()));
+        Assert.assertEquals(SEV, 
flowFile.getAttribute(SyslogAttributes.SEVERITY.key()));
+        Assert.assertEquals(FAC, 
flowFile.getAttribute(SyslogAttributes.FACILITY.key()));
+        Assert.assertEquals(TIME, 
flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key()));
+        Assert.assertEquals(HOST, 
flowFile.getAttribute(SyslogAttributes.HOSTNAME.key()));
+        Assert.assertEquals(BODY, 
flowFile.getAttribute(SyslogAttributes.BODY.key()));
+        Assert.assertEquals("true", 
flowFile.getAttribute(SyslogAttributes.VALID.key()));
+        Assert.assertEquals(String.valueOf(port), 
flowFile.getAttribute(SyslogAttributes.PORT.key()));
+        Assert.assertEquals(protocol, 
flowFile.getAttribute(SyslogAttributes.PROTOCOL.key()));
+        
Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key())));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
index a1a4d04..9e84b37 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java
@@ -17,7 +17,7 @@
 
 package org.apache.nifi.processors.standard;
 
-import 
org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes;
+import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java
new file mode 100644
index 0000000..06ed76e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+
+public class RELPFrameProducer {
+
+    public static final String OPEN_FRAME_DATA = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    public static void main(String[] args) {
+        if (args == null || args.length != 5) {
+            System.err.println("USAGE: RELPFrameProducer <HOST> <PORT> 
<NUM_MSGS> <DELAY_INTERVAL> <DELAY_MILLIS>");
+            System.exit(1);
+        }
+
+        final String host = args[0];
+        final int port = Integer.parseInt(args[1]);
+        final int numMessages = Integer.parseInt(args[2]);
+        final int delayInterval = Integer.parseInt(args[3]);
+        final long delay = Long.parseLong(args[4]);
+
+        final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
+
+        Socket socket = null;
+        try {
+            socket = new Socket(host, port);
+
+            try (final OutputStream out = new 
BufferedOutputStream(socket.getOutputStream())) {
+                // send the open frame
+                out.write(encoder.encode(OPEN_FRAME));
+
+                // send the specified number of syslog messages
+                for (int i=2; i < (numMessages+2); i++) {
+                    final byte[] data = ("this is message # " + 
i).getBytes(StandardCharsets.UTF_8);
+
+                    final RELPFrame syslogFrame = new RELPFrame.Builder()
+                            .txnr(i)
+                            .command("syslog")
+                            .dataLength(data.length)
+                            .data(data)
+                            .build();
+
+                    out.write(encoder.encode(syslogFrame));
+
+                    if (i % delayInterval == 0) {
+                        System.out.println("Sent " + i + " messages");
+                        out.flush();
+                        Thread.sleep(delay);
+                    }
+                }
+
+                // send the close frame
+                out.write(encoder.encode(CLOSE_FRAME));
+
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+        } catch (final IOException e) {
+            e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
new file mode 100644
index 0000000..5d86c26
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.event;
+
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import 
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestRELPEventFactory {
+
+    @Test
+    public void testCreateRELPEvent() {
+        final byte[] data = "this is an 
event".getBytes(StandardCharsets.UTF_8);
+
+        final String sender = "sender1";
+        final long txnr = 1;
+        final String command = "syslog";
+
+        final Map<String,String> metadata = new HashMap<>();
+        metadata.put(EventFactory.SENDER_KEY, sender);
+        metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(txnr));
+        metadata.put(RELPMetadata.COMMAND_KEY, command);
+
+        final ChannelResponder responder = new SocketChannelResponder(null);
+
+        final EventFactory<RELPEvent> factory = new RELPEventFactory();
+
+        final RELPEvent event = factory.create(data, metadata, responder);
+        Assert.assertEquals(data, event.getData());
+        Assert.assertEquals(sender, event.getSender());
+        Assert.assertEquals(txnr, event.getTxnr());
+        Assert.assertEquals(command, event.getCommand());
+        Assert.assertEquals(responder, event.getResponder());
+    }
+}

Reply via email to