Repository: nifi
Updated Branches:
  refs/heads/master 2f5f7b830 -> 1089f0a95


http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java
new file mode 100644
index 0000000..ecd3921
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRELPDecoder {
+
+    public static final String OPEN_FRAME_DATA = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
+    public static final String OPEN_FRAME = "1 open 85 " + OPEN_FRAME_DATA + 
"\n";
+
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message 
here";
+    public static final String SYSLOG_FRAME = "2 syslog 29 " + 
SYSLOG_FRAME_DATA + "\n";
+
+    public static final String CLOSE_FRAME = "3 close 0\n";
+
+    private RELPDecoder decoder;
+
+    @Before
+    public void setup() {
+        this.decoder = new RELPDecoder(StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testDecodeSingleFrame() throws RELPFrameException {
+        final byte[] input = OPEN_FRAME.getBytes(StandardCharsets.UTF_8);
+
+        RELPFrame frame = null;
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frame = decoder.getFrame();
+                break;
+            }
+        }
+
+        Assert.assertNotNull(frame);
+        Assert.assertEquals(1, frame.getTxnr());
+        Assert.assertEquals("open", frame.getCommand());
+        Assert.assertEquals(85, frame.getDataLength());
+
+        Assert.assertNotNull(frame.getData());
+        Assert.assertEquals(OPEN_FRAME_DATA, new String(frame.getData(), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testDecodeMultipleCommands() throws RELPFrameException {
+        final byte[] input = (OPEN_FRAME + SYSLOG_FRAME + 
CLOSE_FRAME).getBytes(StandardCharsets.UTF_8);
+
+        List<RELPFrame> frames = new ArrayList<>();
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frames.add(decoder.getFrame());
+            }
+        }
+
+        Assert.assertEquals(3, frames.size());
+
+        final RELPFrame frame1 = frames.get(0);
+        Assert.assertNotNull(frame1);
+        Assert.assertEquals(1, frame1.getTxnr());
+        Assert.assertEquals("open", frame1.getCommand());
+        Assert.assertEquals(85, frame1.getDataLength());
+
+        Assert.assertNotNull(frame1.getData());
+        Assert.assertEquals(OPEN_FRAME_DATA, new String(frame1.getData(), 
StandardCharsets.UTF_8));
+
+        final RELPFrame frame2 = frames.get(1);
+        Assert.assertNotNull(frame2);
+        Assert.assertEquals(2, frame2.getTxnr());
+        Assert.assertEquals("syslog", frame2.getCommand());
+        Assert.assertEquals(29, frame2.getDataLength());
+
+        Assert.assertNotNull(frame2.getData());
+        Assert.assertEquals(SYSLOG_FRAME_DATA, new String(frame2.getData(), 
StandardCharsets.UTF_8));
+
+        final RELPFrame frame3 = frames.get(2);
+        Assert.assertNotNull(frame3);
+        Assert.assertEquals(3, frame3.getTxnr());
+        Assert.assertEquals("close", frame3.getCommand());
+        Assert.assertEquals(0, frame3.getDataLength());
+    }
+
+    @Test
+    public void testDecodeMultipleSyslogCommands() throws RELPFrameException {
+        final String msg1 = "1 syslog 20 this is message 1234\n";
+        final String msg2 = "2 syslog 22 this is message 456789\n";
+        final String msg3 = "3 syslog 21 this is message ABCDE\n";
+        final String msg = msg1 + msg2 + msg3;
+
+        final byte[] input = msg.getBytes(StandardCharsets.UTF_8);
+
+        List<RELPFrame> frames = new ArrayList<>();
+
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frames.add(decoder.getFrame());
+            }
+        }
+
+        Assert.assertEquals(3, frames.size());
+    }
+
+    @Test(expected = RELPFrameException.class)
+    public void testBadDataShouldThrowException() throws RELPFrameException {
+        final String msg = "NAN syslog 20 this is message 1234\n";
+        final byte[] input = msg.getBytes(StandardCharsets.UTF_8);
+
+        List<RELPFrame> frames = new ArrayList<>();
+
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frames.add(decoder.getFrame());
+            }
+        }
+
+        Assert.fail("Should have thrown exception");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java
new file mode 100644
index 0000000..94cfe39
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class TestRELPEncoder {
+
+    @Test
+    public void testEncodingWithData() throws IOException {
+        final RELPFrame frame = new RELPFrame.Builder()
+                .txnr(1)
+                .command("rsp")
+                .dataLength(5)
+                .data("12345".getBytes(StandardCharsets.UTF_8))
+                .build();
+
+        final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
+
+        final byte[] result = encoder.encode(frame);
+
+        final String expected = "1 rsp 5 12345\n";
+        Assert.assertEquals(expected, new String(result, 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testEncodingNoData() throws IOException {
+        final RELPFrame frame = new RELPFrame.Builder()
+                .txnr(1)
+                .command("rsp")
+                .dataLength(0)
+                .data(new byte[0])
+                .build();
+
+        final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
+
+        final byte[] result = encoder.encode(frame);
+
+        final String expected = "1 rsp 0\n";
+        Assert.assertEquals(expected, new String(result, 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testEncodingOpenResponse() {
+        final String openFrameData = 
"relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";;
+        final String openFrame = "1 open 85 " + openFrameData + "\n";
+        System.out.println(openFrame);
+
+        final RELPDecoder decoder = new RELPDecoder(StandardCharsets.UTF_8);
+        final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
+
+        RELPFrame frame = null;
+        for (byte b : openFrame.getBytes(StandardCharsets.UTF_8)) {
+            if (decoder.process(b)) {
+                frame = decoder.getFrame();
+                break;
+            }
+        }
+
+        Assert.assertNotNull(frame);
+
+        final Map<String,String> offers = 
RELPResponse.parseOffers(frame.getData(), StandardCharsets.UTF_8);
+        final RELPFrame responseFrame = RELPResponse.open(frame.getTxnr(), 
offers).toFrame(StandardCharsets.UTF_8);
+
+        final byte[] response = encoder.encode(responseFrame);
+        System.out.println(new String(response, StandardCharsets.UTF_8));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java
new file mode 100644
index 0000000..6c3eddb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import org.junit.Test;
+
+public class TestRELPFrame {
+
+    @Test(expected = RELPFrameException.class)
+    public void testInvalidTxnr() {
+        new RELPFrame.Builder().command("command").dataLength(5).data(new 
byte[5]).build();
+    }
+
+    @Test(expected = RELPFrameException.class)
+    public void testInvalidCommand() {
+        new RELPFrame.Builder().txnr(1).dataLength(5).data(new 
byte[5]).build();
+    }
+
+    @Test(expected = RELPFrameException.class)
+    public void testBlankCommand() {
+        new RELPFrame.Builder().txnr(1).command("  ").dataLength(5).data(new 
byte[5]).build();
+    }
+
+    @Test(expected = RELPFrameException.class)
+    public void testInvalidDataLength() {
+        new RELPFrame.Builder().txnr(1).command("command").data(new 
byte[5]).build();
+    }
+
+    @Test(expected = RELPFrameException.class)
+    public void testInvalidData() {
+        new 
RELPFrame.Builder().txnr(1).command("command").dataLength(5).data(null).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
new file mode 100644
index 0000000..38b0572
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPEvent;
+import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRELPFrameHandler {
+
+    private Charset charset;
+    private EventFactory<RELPEvent> eventFactory;
+    private BlockingQueue<RELPEvent> events;
+    private SelectionKey key;
+    private AsyncChannelDispatcher dispatcher;
+
+    private RELPFrameHandler<RELPEvent> frameHandler;
+
+    @Before
+    public void setup() {
+        this.charset = StandardCharsets.UTF_8;
+        this.eventFactory = new RELPEventFactory();
+        this.events = new LinkedBlockingQueue<>();
+        this.key = Mockito.mock(SelectionKey.class);
+        this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
+
+        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, 
events, dispatcher);
+    }
+
+    @Test
+    public void testOpen() throws IOException, InterruptedException {
+        final String offer1 = "relp_version=0";
+        final String offer2 = 
"relp_software=librelp,1.2.7,http://librelp.adiscon.com";;
+        final String offer3 = "commands=syslog";
+
+        final String data = offer1 + "\n" + offer2 + "\n" + offer3;
+
+        final RELPFrame openFrame = new RELPFrame.Builder()
+                .txnr(1).command("open")
+                .dataLength(data.length())
+                .data(data.getBytes(charset))
+                .build();
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new 
CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once 
response
+        frameHandler.handle(openFrame, responder, sender);
+        Assert.assertEquals(1, responder.responded);
+        Assert.assertEquals(1, responder.responses.size());
+
+        // verify the response sent back the offers that were received
+        final ChannelResponse response = responder.responses.get(0);
+        final String responseData = new String(response.toByteArray(), 
charset);
+        Assert.assertTrue(responseData.contains(offer1));
+        Assert.assertTrue(responseData.contains(offer2));
+        Assert.assertTrue(responseData.contains(offer3));
+    }
+
+    @Test
+    public void testClose() throws IOException, InterruptedException {
+        final RELPFrame openFrame = new RELPFrame.Builder()
+                .txnr(1).command("close")
+                .dataLength(0)
+                .data(new byte[0])
+                .build();
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new 
CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once 
response
+        frameHandler.handle(openFrame, responder, sender);
+        Assert.assertEquals(1, responder.responded);
+        Assert.assertEquals(1, responder.responses.size());
+
+        // verify the response sent back the offers that were received
+        final ChannelResponse response = responder.responses.get(0);
+        final String responseData = new String(response.toByteArray(), 
charset);
+        Assert.assertTrue(responseData.contains("200 OK"));
+    }
+
+    @Test
+    public void testCommand() throws IOException, InterruptedException {
+        final String data = "this is a syslog message";
+
+        final RELPFrame openFrame = new RELPFrame.Builder()
+                .txnr(1).command("syslog")
+                .dataLength(data.length())
+                .data(data.getBytes(charset))
+                .build();
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new 
CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once 
response
+        frameHandler.handle(openFrame, responder, sender);
+        Assert.assertEquals(0, responder.responded);
+        Assert.assertEquals(0, responder.responses.size());
+        Assert.assertEquals(1, events.size());
+
+        final RELPEvent event = events.poll();
+        Assert.assertEquals(data, new String(event.getData(), charset));
+    }
+
+    private static class CapturingChannelResponder implements 
ChannelResponder<SocketChannel> {
+
+        int responded;
+        List<ChannelResponse> responses = new ArrayList<>();
+
+        @Override
+        public SocketChannel getChannel() {
+            return Mockito.mock(SocketChannel.class);
+        }
+
+        @Override
+        public List<ChannelResponse> getResponses() {
+            return responses;
+        }
+
+        @Override
+        public void addResponse(ChannelResponse response) {
+            responses.add(response);
+        }
+
+        @Override
+        public void respond() throws IOException {
+            responded++;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
new file mode 100644
index 0000000..2a51ad6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRELPSocketChannelHandler {
+
+    private EventFactory<TestEvent> eventFactory;
+    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> 
channelHandlerFactory;
+    private BlockingQueue<ByteBuffer> byteBuffers;
+    private BlockingQueue<TestEvent> events;
+    private ProcessorLog logger = Mockito.mock(ProcessorLog.class);
+    private int maxConnections;
+    private SSLContext sslContext;
+    private Charset charset;
+    private ChannelDispatcher dispatcher;
+
+    @Before
+    public void setup() {
+        eventFactory = new TestEventHolderFactory();
+        channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
+
+        byteBuffers = new LinkedBlockingQueue<>();
+        byteBuffers.add(ByteBuffer.allocate(4096));
+
+        events = new LinkedBlockingQueue<>();
+        logger = Mockito.mock(ProcessorLog.class);
+
+        maxConnections = 1;
+        sslContext = null;
+        charset = StandardCharsets.UTF_8;
+
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBuffers, events, logger,
+                maxConnections, sslContext, charset);
+
+    }
+
+    @Test
+    public void testBasicHandling() throws IOException, InterruptedException {
+        final List<String> messages = new ArrayList<>();
+        messages.add("1 syslog 20 this is message 1234\n");
+        messages.add("2 syslog 22 this is message 456789\n");
+        messages.add("3 syslog 21 this is message ABCDE\n");
+
+        run(messages);
+        Assert.assertEquals(messages.size(), events.size());
+
+        boolean found1 = false;
+        boolean found2 = false;
+        boolean found3 = false;
+
+        TestEvent event;
+        while((event = events.poll()) != null) {
+            Map<String,String> metadata = event.metadata;
+            Assert.assertTrue(metadata.containsKey(RELPMetadata.TXNR_KEY));
+
+            final String txnr = metadata.get(RELPMetadata.TXNR_KEY);
+            if (txnr.equals("1")) {
+                found1 = true;
+            } else if (txnr.equals("2")) {
+                found2 = true;
+            } else if (txnr.equals("3")) {
+                found3 = true;
+            }
+        }
+
+        Assert.assertTrue(found1);
+        Assert.assertTrue(found2);
+        Assert.assertTrue(found3);
+    }
+
+    @Test
+    public void testLotsOfFrames() throws IOException, InterruptedException {
+        final String baseMessage = " syslog 19 this is message ";
+        final List<String> messages = new ArrayList<>();
+
+        for (int i=100; i < 1000; i++) {
+            messages.add(i + baseMessage + i + "\n");
+        }
+
+        run(messages);
+        Assert.assertEquals(messages.size(), events.size());
+    }
+
+    protected void run(List<String> messages) throws IOException, 
InterruptedException {
+        final ByteBuffer buffer = ByteBuffer.allocate(1024);
+        try {
+            // starts the dispatcher listening on port 0 so it selects a 
random port
+            dispatcher.open(0, 4096);
+
+            // starts a thread to run the dispatcher which will accept/read 
connections
+            Thread dispatcherThread = new Thread(dispatcher);
+            dispatcherThread.start();
+
+
+            // create a client connection to the port the dispatcher is 
listening on
+            final int realPort = dispatcher.getPort();
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", realPort));
+                Thread.sleep(100);
+
+                // send the provided messages
+                for (int i=0; i < messages.size(); i++) {
+                    buffer.clear();
+                    buffer.put(messages.get(i).getBytes(charset));
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(1);
+                }
+            }
+
+            // wait up to 10 seconds to verify the responses
+            long timeout = 10000;
+            long startTime = System.currentTimeMillis();
+            while (events.size() < messages.size() && 
(System.currentTimeMillis() - startTime < timeout)) {
+                Thread.sleep(100);
+            }
+
+            // should have gotten an event for each message sent
+            Assert.assertEquals(messages.size(), events.size());
+
+        } finally {
+            // stop the dispatcher thread and ensure we shut down handler 
threads
+            dispatcher.stop();
+            dispatcher.close();
+        }
+    }
+
+    // Test event to produce from the data
+    private static class TestEvent implements Event<SocketChannel> {
+
+        private byte[] data;
+        private Map<String,String> metadata;
+
+        public TestEvent(byte[] data, Map<String, String> metadata) {
+            this.data = data;
+            this.metadata = metadata;
+        }
+
+        @Override
+        public String getSender() {
+            return metadata.get(EventFactory.SENDER_KEY);
+        }
+
+        @Override
+        public byte[] getData() {
+            return data;
+        }
+
+        @Override
+        public ChannelResponder<SocketChannel> getResponder() {
+            return null;
+        }
+    }
+
+    // Factory to create test events and send responses for testing
+    private static class TestEventHolderFactory implements 
EventFactory<TestEvent> {
+
+        @Override
+        public TestEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
+            return new TestEvent(data, metadata);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java
new file mode 100644
index 0000000..bf90e83
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.response;
+
+import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestRELPResponse {
+
+    @Test
+    public void testResponseToFrame() throws IOException {
+        final long txnr = 123456789;
+        final int code = RELPResponse.OK;
+        final String message = "this is a message";
+        final String data = "this is some data";
+
+        final RELPResponse response = new RELPResponse(txnr, code, message, 
data);
+
+        final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = code + " " + message + "\n" + data;
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testResponseToFrameNoMessage() throws IOException {
+        final long txnr = 123456789;
+        final int code = RELPResponse.OK;
+        final String data = "this is some data";
+
+        final RELPResponse response = new RELPResponse(txnr, code, null, data);
+
+        final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = code + "\n" + data;
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testResponseToFrameNoData() throws IOException {
+        final long txnr = 123456789;
+        final int code = RELPResponse.OK;
+        final String message = "this is a message";
+
+        final RELPResponse response = new RELPResponse(txnr, code, message, 
null);
+
+        final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = code + " " + message;
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testResponseToFrameNoDataNoMessage() throws IOException {
+        final long txnr = 123456789;
+        final int code = RELPResponse.OK;
+
+        final RELPResponse response = new RELPResponse(txnr, code);
+
+        final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = code + "";
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testCreateOpenResponse() {
+        final long txnr = 123456789;
+
+        final Map<String,String> offers = new HashMap<>();
+        offers.put("key1", "val1");
+        offers.put("key2", "val2");
+
+        final RELPResponse openResponse = RELPResponse.open(txnr, offers);
+
+        final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected1 = RELPResponse.OK + " OK\n" + 
"key1=val1\nkey2=val2";
+        final String expected2 = RELPResponse.OK + " OK\n" + 
"key2=val2\nkey1=val1";
+        Assert.assertTrue(result.equals(expected1) || 
result.equals(expected2));
+        Assert.assertEquals(expected1.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testCreateOpenResponseNoOffers() {
+        final long txnr = 123456789;
+        final Map<String,String> offers = new HashMap<>();
+
+        final RELPResponse openResponse = RELPResponse.open(txnr, offers);
+
+        final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = RELPResponse.OK + " OK\n";
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testCreateOkResponse() {
+        final long txnr = 123456789;
+        final RELPResponse openResponse = RELPResponse.ok(txnr);
+
+        final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = RELPResponse.OK + " OK";
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+
+    @Test
+    public void testCreateErrorResponse() {
+        final long txnr = 123456789;
+        final RELPResponse openResponse = RELPResponse.error(txnr);
+
+        final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8);
+        Assert.assertEquals(txnr, frame.getTxnr());
+        Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand());
+
+        final String result = new String(frame.getData(), 
StandardCharsets.UTF_8);
+        final String expected = RELPResponse.ERROR + " ERROR";
+        Assert.assertEquals(expected, result);
+        Assert.assertEquals(expected.length(), frame.getDataLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
index d1b1bbf..24fd59e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import org.apache.nifi.processors.standard.syslog.SyslogEvent;
+import org.apache.nifi.processors.standard.syslog.SyslogParser;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;

Reply via email to