Repository: nifi Updated Branches: refs/heads/master 2f5f7b830 -> 1089f0a95
http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java new file mode 100644 index 0000000..ecd3921 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPDecoder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class TestRELPDecoder { + + public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; + public static final String OPEN_FRAME = "1 open 85 " + OPEN_FRAME_DATA + "\n"; + + public static final String SYSLOG_FRAME_DATA = "this is a syslog message here"; + public static final String SYSLOG_FRAME = "2 syslog 29 " + SYSLOG_FRAME_DATA + "\n"; + + public static final String CLOSE_FRAME = "3 close 0\n"; + + private RELPDecoder decoder; + + @Before + public void setup() { + this.decoder = new RELPDecoder(StandardCharsets.UTF_8); + } + + @Test + public void testDecodeSingleFrame() throws RELPFrameException { + final byte[] input = OPEN_FRAME.getBytes(StandardCharsets.UTF_8); + + RELPFrame frame = null; + for (byte b : input) { + if (decoder.process(b)) { + frame = decoder.getFrame(); + break; + } + } + + Assert.assertNotNull(frame); + Assert.assertEquals(1, frame.getTxnr()); + Assert.assertEquals("open", frame.getCommand()); + Assert.assertEquals(85, frame.getDataLength()); + + Assert.assertNotNull(frame.getData()); + Assert.assertEquals(OPEN_FRAME_DATA, new String(frame.getData(), StandardCharsets.UTF_8)); + } + + @Test + public void testDecodeMultipleCommands() throws RELPFrameException { + final byte[] input = (OPEN_FRAME + SYSLOG_FRAME + CLOSE_FRAME).getBytes(StandardCharsets.UTF_8); + + List<RELPFrame> frames = new ArrayList<>(); + for (byte b : input) { + if (decoder.process(b)) { + frames.add(decoder.getFrame()); + } + } + + Assert.assertEquals(3, frames.size()); + + final RELPFrame frame1 = frames.get(0); + Assert.assertNotNull(frame1); + Assert.assertEquals(1, frame1.getTxnr()); + Assert.assertEquals("open", frame1.getCommand()); + Assert.assertEquals(85, frame1.getDataLength()); + + Assert.assertNotNull(frame1.getData()); + Assert.assertEquals(OPEN_FRAME_DATA, new String(frame1.getData(), StandardCharsets.UTF_8)); + + final RELPFrame frame2 = frames.get(1); + Assert.assertNotNull(frame2); + Assert.assertEquals(2, frame2.getTxnr()); + Assert.assertEquals("syslog", frame2.getCommand()); + Assert.assertEquals(29, frame2.getDataLength()); + + Assert.assertNotNull(frame2.getData()); + Assert.assertEquals(SYSLOG_FRAME_DATA, new String(frame2.getData(), StandardCharsets.UTF_8)); + + final RELPFrame frame3 = frames.get(2); + Assert.assertNotNull(frame3); + Assert.assertEquals(3, frame3.getTxnr()); + Assert.assertEquals("close", frame3.getCommand()); + Assert.assertEquals(0, frame3.getDataLength()); + } + + @Test + public void testDecodeMultipleSyslogCommands() throws RELPFrameException { + final String msg1 = "1 syslog 20 this is message 1234\n"; + final String msg2 = "2 syslog 22 this is message 456789\n"; + final String msg3 = "3 syslog 21 this is message ABCDE\n"; + final String msg = msg1 + msg2 + msg3; + + final byte[] input = msg.getBytes(StandardCharsets.UTF_8); + + List<RELPFrame> frames = new ArrayList<>(); + + for (byte b : input) { + if (decoder.process(b)) { + frames.add(decoder.getFrame()); + } + } + + Assert.assertEquals(3, frames.size()); + } + + @Test(expected = RELPFrameException.class) + public void testBadDataShouldThrowException() throws RELPFrameException { + final String msg = "NAN syslog 20 this is message 1234\n"; + final byte[] input = msg.getBytes(StandardCharsets.UTF_8); + + List<RELPFrame> frames = new ArrayList<>(); + + for (byte b : input) { + if (decoder.process(b)) { + frames.add(decoder.getFrame()); + } + } + + Assert.fail("Should have thrown exception"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java new file mode 100644 index 0000000..94cfe39 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPEncoder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import org.apache.nifi.processors.standard.relp.response.RELPResponse; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class TestRELPEncoder { + + @Test + public void testEncodingWithData() throws IOException { + final RELPFrame frame = new RELPFrame.Builder() + .txnr(1) + .command("rsp") + .dataLength(5) + .data("12345".getBytes(StandardCharsets.UTF_8)) + .build(); + + final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8); + + final byte[] result = encoder.encode(frame); + + final String expected = "1 rsp 5 12345\n"; + Assert.assertEquals(expected, new String(result, StandardCharsets.UTF_8)); + } + + @Test + public void testEncodingNoData() throws IOException { + final RELPFrame frame = new RELPFrame.Builder() + .txnr(1) + .command("rsp") + .dataLength(0) + .data(new byte[0]) + .build(); + + final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8); + + final byte[] result = encoder.encode(frame); + + final String expected = "1 rsp 0\n"; + Assert.assertEquals(expected, new String(result, StandardCharsets.UTF_8)); + } + + @Test + public void testEncodingOpenResponse() { + final String openFrameData = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; + final String openFrame = "1 open 85 " + openFrameData + "\n"; + System.out.println(openFrame); + + final RELPDecoder decoder = new RELPDecoder(StandardCharsets.UTF_8); + final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8); + + RELPFrame frame = null; + for (byte b : openFrame.getBytes(StandardCharsets.UTF_8)) { + if (decoder.process(b)) { + frame = decoder.getFrame(); + break; + } + } + + Assert.assertNotNull(frame); + + final Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), StandardCharsets.UTF_8); + final RELPFrame responseFrame = RELPResponse.open(frame.getTxnr(), offers).toFrame(StandardCharsets.UTF_8); + + final byte[] response = encoder.encode(responseFrame); + System.out.println(new String(response, StandardCharsets.UTF_8)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java new file mode 100644 index 0000000..6c3eddb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/TestRELPFrame.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import org.junit.Test; + +public class TestRELPFrame { + + @Test(expected = RELPFrameException.class) + public void testInvalidTxnr() { + new RELPFrame.Builder().command("command").dataLength(5).data(new byte[5]).build(); + } + + @Test(expected = RELPFrameException.class) + public void testInvalidCommand() { + new RELPFrame.Builder().txnr(1).dataLength(5).data(new byte[5]).build(); + } + + @Test(expected = RELPFrameException.class) + public void testBlankCommand() { + new RELPFrame.Builder().txnr(1).command(" ").dataLength(5).data(new byte[5]).build(); + } + + @Test(expected = RELPFrameException.class) + public void testInvalidDataLength() { + new RELPFrame.Builder().txnr(1).command("command").data(new byte[5]).build(); + } + + @Test(expected = RELPFrameException.class) + public void testInvalidData() { + new RELPFrame.Builder().txnr(1).command("command").dataLength(5).data(null).build(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java new file mode 100644 index 0000000..38b0572 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.standard.relp.event.RELPEvent; +import org.apache.nifi.processors.standard.relp.event.RELPEventFactory; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestRELPFrameHandler { + + private Charset charset; + private EventFactory<RELPEvent> eventFactory; + private BlockingQueue<RELPEvent> events; + private SelectionKey key; + private AsyncChannelDispatcher dispatcher; + + private RELPFrameHandler<RELPEvent> frameHandler; + + @Before + public void setup() { + this.charset = StandardCharsets.UTF_8; + this.eventFactory = new RELPEventFactory(); + this.events = new LinkedBlockingQueue<>(); + this.key = Mockito.mock(SelectionKey.class); + this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); + + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + } + + @Test + public void testOpen() throws IOException, InterruptedException { + final String offer1 = "relp_version=0"; + final String offer2 = "relp_software=librelp,1.2.7,http://librelp.adiscon.com"; + final String offer3 = "commands=syslog"; + + final String data = offer1 + "\n" + offer2 + "\n" + offer3; + + final RELPFrame openFrame = new RELPFrame.Builder() + .txnr(1).command("open") + .dataLength(data.length()) + .data(data.getBytes(charset)) + .build(); + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(openFrame, responder, sender); + Assert.assertEquals(1, responder.responded); + Assert.assertEquals(1, responder.responses.size()); + + // verify the response sent back the offers that were received + final ChannelResponse response = responder.responses.get(0); + final String responseData = new String(response.toByteArray(), charset); + Assert.assertTrue(responseData.contains(offer1)); + Assert.assertTrue(responseData.contains(offer2)); + Assert.assertTrue(responseData.contains(offer3)); + } + + @Test + public void testClose() throws IOException, InterruptedException { + final RELPFrame openFrame = new RELPFrame.Builder() + .txnr(1).command("close") + .dataLength(0) + .data(new byte[0]) + .build(); + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(openFrame, responder, sender); + Assert.assertEquals(1, responder.responded); + Assert.assertEquals(1, responder.responses.size()); + + // verify the response sent back the offers that were received + final ChannelResponse response = responder.responses.get(0); + final String responseData = new String(response.toByteArray(), charset); + Assert.assertTrue(responseData.contains("200 OK")); + } + + @Test + public void testCommand() throws IOException, InterruptedException { + final String data = "this is a syslog message"; + + final RELPFrame openFrame = new RELPFrame.Builder() + .txnr(1).command("syslog") + .dataLength(data.length()) + .data(data.getBytes(charset)) + .build(); + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(openFrame, responder, sender); + Assert.assertEquals(0, responder.responded); + Assert.assertEquals(0, responder.responses.size()); + Assert.assertEquals(1, events.size()); + + final RELPEvent event = events.poll(); + Assert.assertEquals(data, new String(event.getData(), charset)); + } + + private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> { + + int responded; + List<ChannelResponse> responses = new ArrayList<>(); + + @Override + public SocketChannel getChannel() { + return Mockito.mock(SocketChannel.class); + } + + @Override + public List<ChannelResponse> getResponses() { + return responses; + } + + @Override + public void addResponse(ChannelResponse response) { + responses.add(response); + } + + @Override + public void respond() throws IOException { + responded++; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java new file mode 100644 index 0000000..2a51ad6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processors.standard.relp.event.RELPMetadata; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class TestRELPSocketChannelHandler { + + private EventFactory<TestEvent> eventFactory; + private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory; + private BlockingQueue<ByteBuffer> byteBuffers; + private BlockingQueue<TestEvent> events; + private ProcessorLog logger = Mockito.mock(ProcessorLog.class); + private int maxConnections; + private SSLContext sslContext; + private Charset charset; + private ChannelDispatcher dispatcher; + + @Before + public void setup() { + eventFactory = new TestEventHolderFactory(); + channelHandlerFactory = new RELPSocketChannelHandlerFactory<>(); + + byteBuffers = new LinkedBlockingQueue<>(); + byteBuffers.add(ByteBuffer.allocate(4096)); + + events = new LinkedBlockingQueue<>(); + logger = Mockito.mock(ProcessorLog.class); + + maxConnections = 1; + sslContext = null; + charset = StandardCharsets.UTF_8; + + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + maxConnections, sslContext, charset); + + } + + @Test + public void testBasicHandling() throws IOException, InterruptedException { + final List<String> messages = new ArrayList<>(); + messages.add("1 syslog 20 this is message 1234\n"); + messages.add("2 syslog 22 this is message 456789\n"); + messages.add("3 syslog 21 this is message ABCDE\n"); + + run(messages); + Assert.assertEquals(messages.size(), events.size()); + + boolean found1 = false; + boolean found2 = false; + boolean found3 = false; + + TestEvent event; + while((event = events.poll()) != null) { + Map<String,String> metadata = event.metadata; + Assert.assertTrue(metadata.containsKey(RELPMetadata.TXNR_KEY)); + + final String txnr = metadata.get(RELPMetadata.TXNR_KEY); + if (txnr.equals("1")) { + found1 = true; + } else if (txnr.equals("2")) { + found2 = true; + } else if (txnr.equals("3")) { + found3 = true; + } + } + + Assert.assertTrue(found1); + Assert.assertTrue(found2); + Assert.assertTrue(found3); + } + + @Test + public void testLotsOfFrames() throws IOException, InterruptedException { + final String baseMessage = " syslog 19 this is message "; + final List<String> messages = new ArrayList<>(); + + for (int i=100; i < 1000; i++) { + messages.add(i + baseMessage + i + "\n"); + } + + run(messages); + Assert.assertEquals(messages.size(), events.size()); + } + + protected void run(List<String> messages) throws IOException, InterruptedException { + final ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + // starts the dispatcher listening on port 0 so it selects a random port + dispatcher.open(0, 4096); + + // starts a thread to run the dispatcher which will accept/read connections + Thread dispatcherThread = new Thread(dispatcher); + dispatcherThread.start(); + + + // create a client connection to the port the dispatcher is listening on + final int realPort = dispatcher.getPort(); + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", realPort)); + Thread.sleep(100); + + // send the provided messages + for (int i=0; i < messages.size(); i++) { + buffer.clear(); + buffer.put(messages.get(i).getBytes(charset)); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(1); + } + } + + // wait up to 10 seconds to verify the responses + long timeout = 10000; + long startTime = System.currentTimeMillis(); + while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { + Thread.sleep(100); + } + + // should have gotten an event for each message sent + Assert.assertEquals(messages.size(), events.size()); + + } finally { + // stop the dispatcher thread and ensure we shut down handler threads + dispatcher.stop(); + dispatcher.close(); + } + } + + // Test event to produce from the data + private static class TestEvent implements Event<SocketChannel> { + + private byte[] data; + private Map<String,String> metadata; + + public TestEvent(byte[] data, Map<String, String> metadata) { + this.data = data; + this.metadata = metadata; + } + + @Override + public String getSender() { + return metadata.get(EventFactory.SENDER_KEY); + } + + @Override + public byte[] getData() { + return data; + } + + @Override + public ChannelResponder<SocketChannel> getResponder() { + return null; + } + } + + // Factory to create test events and send responses for testing + private static class TestEventHolderFactory implements EventFactory<TestEvent> { + + @Override + public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { + return new TestEvent(data, metadata); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java new file mode 100644 index 0000000..bf90e83 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/response/TestRELPResponse.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.response; + +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TestRELPResponse { + + @Test + public void testResponseToFrame() throws IOException { + final long txnr = 123456789; + final int code = RELPResponse.OK; + final String message = "this is a message"; + final String data = "this is some data"; + + final RELPResponse response = new RELPResponse(txnr, code, message, data); + + final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = code + " " + message + "\n" + data; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testResponseToFrameNoMessage() throws IOException { + final long txnr = 123456789; + final int code = RELPResponse.OK; + final String data = "this is some data"; + + final RELPResponse response = new RELPResponse(txnr, code, null, data); + + final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = code + "\n" + data; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testResponseToFrameNoData() throws IOException { + final long txnr = 123456789; + final int code = RELPResponse.OK; + final String message = "this is a message"; + + final RELPResponse response = new RELPResponse(txnr, code, message, null); + + final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = code + " " + message; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testResponseToFrameNoDataNoMessage() throws IOException { + final long txnr = 123456789; + final int code = RELPResponse.OK; + + final RELPResponse response = new RELPResponse(txnr, code); + + final RELPFrame frame = response.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = code + ""; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testCreateOpenResponse() { + final long txnr = 123456789; + + final Map<String,String> offers = new HashMap<>(); + offers.put("key1", "val1"); + offers.put("key2", "val2"); + + final RELPResponse openResponse = RELPResponse.open(txnr, offers); + + final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected1 = RELPResponse.OK + " OK\n" + "key1=val1\nkey2=val2"; + final String expected2 = RELPResponse.OK + " OK\n" + "key2=val2\nkey1=val1"; + Assert.assertTrue(result.equals(expected1) || result.equals(expected2)); + Assert.assertEquals(expected1.length(), frame.getDataLength()); + } + + @Test + public void testCreateOpenResponseNoOffers() { + final long txnr = 123456789; + final Map<String,String> offers = new HashMap<>(); + + final RELPResponse openResponse = RELPResponse.open(txnr, offers); + + final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = RELPResponse.OK + " OK\n"; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testCreateOkResponse() { + final long txnr = 123456789; + final RELPResponse openResponse = RELPResponse.ok(txnr); + + final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = RELPResponse.OK + " OK"; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } + + @Test + public void testCreateErrorResponse() { + final long txnr = 123456789; + final RELPResponse openResponse = RELPResponse.error(txnr); + + final RELPFrame frame = openResponse.toFrame(StandardCharsets.UTF_8); + Assert.assertEquals(txnr, frame.getTxnr()); + Assert.assertEquals(RELPResponse.RSP_CMD, frame.getCommand()); + + final String result = new String(frame.getData(), StandardCharsets.UTF_8); + final String expected = RELPResponse.ERROR + " ERROR"; + Assert.assertEquals(expected, result); + Assert.assertEquals(expected.length(), frame.getDataLength()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java index d1b1bbf..24fd59e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSyslogParser.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.standard.util; +import org.apache.nifi.processors.standard.syslog.SyslogEvent; +import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.junit.Assert; import org.junit.Before; import org.junit.Test;
