Repository: nifi Updated Branches: refs/heads/0.x d01f2d910 -> 258fa660d
http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java new file mode 100644 index 0000000..2ace367 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.response; + +import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; + +import java.nio.ByteBuffer; + +/** + 'ack' frame type + + SENT FROM READER ONLY + frame type value: ASCII 'A' aka byte value 0x41 + + Payload: + 32bit unsigned sequence number. + + */ +public class LumberjackResponse { + private final long seqNumber; + final private byte version = 0x31, frameType = 0x41; + + + + public LumberjackResponse(final long seqNumber) { + this.seqNumber = seqNumber; + } + + /** + * Creates a LumberjackFrame where the data portion will contain this response. + * + * + * @return a LumberjackFrame for for this response + */ + public LumberjackFrame toFrame() { + + return new LumberjackFrame.Builder() + .version(version) + .frameType(frameType) + .payload(ByteBuffer.allocate(8).putLong(seqNumber).array()) + .build(); + } + + public static LumberjackResponse ok(final long seqNumber) { + return new LumberjackResponse(seqNumber); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..3c23391 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.lumberjack.ListenLumberjack http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java new file mode 100644 index 0000000..7b239bb --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.lumberjack.event; + +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; +import org.junit.Assert; +import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +public class TestLumberjackEventFactory { + + @Test + public void testCreateLumberJackEvent() { + final String sender = "testsender1"; + final byte[] data = "this is a test line".getBytes(); + final long seqNumber = 1; + final String fields = "{\"file\":\"test\"}"; + + + final Map<String,String> metadata = new HashMap<>(); + metadata.put(EventFactory.SENDER_KEY, sender); + metadata.put(LumberjackMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber)); + metadata.put(LumberjackMetadata.FIELDS_KEY, String.valueOf(fields)); + + final ChannelResponder responder = new SocketChannelResponder(null); + + final EventFactory<LumberjackEvent> factory = new LumberjackEventFactory(); + + final LumberjackEvent event = factory.create(data, metadata, responder); + + Assert.assertEquals(sender, event.getSender()); + Assert.assertEquals(seqNumber, event.getSeqNumber()); + Assert.assertEquals(fields, event.getFields()); + Assert.assertEquals(data, event.getData()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java new file mode 100644 index 0000000..f7b42e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.frame; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import javax.xml.bind.DatatypeConverter; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestLumberjackDecoder { + + // Because no encoder for type 43 was coded, added Static hex + // representation of compressed data + // + private static final String singleFrameData = "3143000000aa785e4c8e4daac3300c8413c8cbfeddc017681da7b48540775df51245103936f54fb" + + "04c4a6e5f6917d03020e91bc93c9ba669597faccefa80ec0fed72440dd1174833e819370c798d98aa0e79a10ae44e36972f94198b26886bc" + + "0774422589024c865aaecff07f24c6e1b0c37fb6c2da18cdb4176834f72747c4152e6aa46330db7e9725707567db0240c93aace93e212464" + + "95857f755e89e76e2d77e000000ffff010000ffff05b43bb8"; + private static final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728031957a97f82" + + "232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f021f71461b26873e711bee9480f48b0af10fe28" + + "89113b8c9e28f4322b82395413a50cafd79957c253d0b992faf4129c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f468079" + + "6b421964fc9b032ac4dcb54d2575a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff" + + "010000ffff35e0eff0"; + private static final String payload = ""; + + private LumberjackDecoder decoder; + + @Before + public void setup() { + this.decoder = new LumberjackDecoder(StandardCharsets.UTF_8); + } + + @Test + public void testDecodeSingleFrame() { + final byte[] input = DatatypeConverter.parseHexBinary(singleFrameData); + + List<LumberjackFrame> frames = null; + LumberjackFrame frame = null; + + for (byte b : input) { + if (decoder.process(b)) { + frames = decoder.getFrames(); + break; + } + } + + frame = frames.get(frames.size() - 1); + + Assert.assertNotNull(frame); + Assert.assertEquals(0x31, frame.getVersion()); + Assert.assertEquals(0x44, frame.getFrameType()); + Assert.assertEquals(1, frame.getSeqNumber()); + // Look for a predefined number of bytes for matching of the inner payload + Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15)); + } + + @Test + public void testDecodeMultipleFrame() { + final byte[] input = DatatypeConverter.parseHexBinary(multiFrameData); + + List<LumberjackFrame> frames = null; + LumberjackFrame frame = null; + + for (byte b : input) { + if (decoder.process(b)) { + frames = decoder.getFrames(); + break; + } + } + + frame = frames.get(1); + + Assert.assertNotNull(frame); + Assert.assertEquals(0x31, frame.getVersion()); + Assert.assertEquals(0x44, frame.getFrameType()); + // Load the second frame therefore seqNumber = 2 + Assert.assertEquals(2, frame.getSeqNumber()); + // Look for a predefined number of bytes for matching of the inner payload + Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"), Arrays.copyOfRange(frame.getPayload(), 0, 15)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java new file mode 100644 index 0000000..aa33fe0 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.frame; + +import java.nio.ByteBuffer; + +import javax.xml.bind.DatatypeConverter; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestLumberjackEncoder { + private LumberjackEncoder encoder; + + + @Before + public void setup() { + this.encoder = new LumberjackEncoder(); + } + + @Test + public void testEncode() { + LumberjackFrame frame = new LumberjackFrame.Builder() + .version((byte) 0x31) + .frameType((byte) 0x41) + .payload(ByteBuffer.allocate(8).putLong(123).array()) + .build(); + + byte[] encoded = encoder.encode(frame); + + Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("3141000000000000007B"), encoded); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java new file mode 100644 index 0000000..44fecb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.frame; + +import org.junit.Test; + + +public class TestLumberjackFrame { + + @Test(expected = LumberjackFrameException.class) + public void testInvalidVersion() { + new LumberjackFrame.Builder().seqNumber(1234).dataSize(3).build(); + } + + @Test(expected = LumberjackFrameException.class) + public void testInvalidFrameType() { + new LumberjackFrame.Builder().frameType((byte) 0x70).dataSize(5).build(); + } + + @Test(expected = LumberjackFrameException.class) + public void testBlankFrameType() { + new LumberjackFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java new file mode 100644 index 0000000..253fa18 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.handler; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.lumberjack.event.LumberjackEvent; +import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory; +import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; +import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + +public class TestLumberjackFrameHandler { + private Charset charset; + private EventFactory<LumberjackEvent> eventFactory; + private BlockingQueue<LumberjackEvent> events; + private SelectionKey key; + private AsyncChannelDispatcher dispatcher; + private LumberjackEncoder encoder; + + private ProcessorLog logger; + + private LumberjackFrameHandler<LumberjackEvent> frameHandler; + + @Before + public void setup() { + this.charset = StandardCharsets.UTF_8; + this.eventFactory = new LumberjackEventFactory(); + this.events = new LinkedBlockingQueue<>(); + this.key = Mockito.mock(SelectionKey.class); + this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class); + this.logger = Mockito.mock(ProcessorLog.class); + + this.frameHandler = new LumberjackFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger); + } + + @Test + public void testWindow() throws IOException, InterruptedException { + final LumberjackFrame openFrame = new LumberjackFrame.Builder() + .version((byte) 0x31) + .frameType((byte) 0x57) + .seqNumber(-1) + .payload(Integer.toString(1).getBytes()) + .build(); + + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(openFrame, responder, sender); + + // No response expected + Assert.assertEquals(0, responder.responded); + } + + + @Test + public void testData() throws IOException, InterruptedException { + final byte payload[] = new byte[]{ + 0x00, 0x00, 0x00, 0x02, // Number of pairs + 0x00, 0x00, 0x00, 0x04, // Length of first pair key ('line') + 0x6C, 0x69, 0x6E, 0x65, // 'line' + 0x00, 0x00, 0x00, 0x0C, // Lenght of 'test-content' + 0x74, 0x65, 0x73, 0x74, // + 0x2d, 0x63, 0x6f, 0x6e, // 'test-content' + 0x74, 0x65, 0x6e, 0x74, // + 0x00, 0x00, 0x00, 0x05, // Length of 2nd pair key (field) + 0x66, 0x69, 0x65, 0x6c, // + 0x64, // 'field' + 0x00, 0x00, 0x00, 0x05, // Length of 'value' + 0x76, 0x61, 0x6c, 0x75, // 'value' + 0x65 + }; + + final LumberjackFrame dataFrame = new LumberjackFrame.Builder() + .version((byte) 0x31) + .frameType((byte) 0x44) + .seqNumber(1) + // Payload eq { enil: hello } + .payload(payload) + .build(); + + + final String sender = "sender1"; + final CapturingChannelResponder responder = new CapturingChannelResponder(); + + // call the handler and verify respond() was called once with once response + frameHandler.handle(dataFrame, responder, sender); + + // No response expected + Assert.assertEquals(0, responder.responded); + // But events should contain one event + Assert.assertEquals(1, events.size()); + + final LumberjackEvent event = events.poll(); + Assert.assertEquals("{\"field\":\"value\"}", new String((event.getFields()))); + Assert.assertEquals("test-content", new String(event.getData(), charset)); + } + + private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> { + + int responded; + List<ChannelResponse> responses = new ArrayList<>(); + + @Override + public SocketChannel getChannel() { + return Mockito.mock(SocketChannel.class); + } + + @Override + public List<ChannelResponse> getResponses() { + return responses; + } + + @Override + public void addResponse(ChannelResponse response) { + responses.add(response); + } + + @Override + public void respond() throws IOException { + responded++; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java new file mode 100644 index 0000000..c29ba65 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.net.ssl.SSLContext; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + + + + +public class TestLumberjackSocketChannelHandler { + private EventFactory<TestEvent> eventFactory; + private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory; + private BlockingQueue<ByteBuffer> byteBuffers; + private BlockingQueue<TestEvent> events; + private ProcessorLog logger = Mockito.mock(ProcessorLog.class); + private int maxConnections; + private SSLContext sslContext; + private Charset charset; + private ChannelDispatcher dispatcher; + + @Before + public void setup() { + eventFactory = new TestEventHolderFactory(); + channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>(); + + byteBuffers = new LinkedBlockingQueue<>(); + byteBuffers.add(ByteBuffer.allocate(4096)); + + events = new LinkedBlockingQueue<>(); + logger = Mockito.mock(ProcessorLog.class); + + maxConnections = 1; + sslContext = null; + charset = StandardCharsets.UTF_8; + + dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBuffers, events, logger, + maxConnections, sslContext, charset); + + } + + @Test + public void testBasicHandling() throws IOException, InterruptedException { + final String multiFrameData = "3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" + + "031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02" + + "1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41" + + "29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575" + + "a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000" + + "0ffff35e0eff0"; + final List<String> messages = new ArrayList<>(); + messages.add(multiFrameData); + + run(messages); + + // Check for the 4 frames (from the hex string above) are back... + Assert.assertEquals(4, events.size()); + + boolean found1 = false; + boolean found2 = false; + boolean found3 = false; + boolean found4 = false; + + TestEvent event; + while((event = events.poll()) != null) { + Map<String,String> metadata = event.metadata; + Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY)); + + final String seqNum = metadata.get(LumberjackMetadata.SEQNUMBER_KEY); + if (seqNum.equals("1")) { + found1 = true; + } else if (seqNum.equals("2")) { + found2 = true; + } else if (seqNum.equals("3")) { + found3 = true; + } else if (seqNum.equals("4")) { + found4 = true; + } + } + + Assert.assertTrue(found1); + Assert.assertTrue(found2); + Assert.assertTrue(found3); + Assert.assertTrue(found4); + } + + protected void run(List<String> messages) throws IOException, InterruptedException { + final ByteBuffer buffer = ByteBuffer.allocate(1024); + try { + // starts the dispatcher listening on port 0 so it selects a random port + dispatcher.open(null, 0, 4096); + + // starts a thread to run the dispatcher which will accept/read connections + Thread dispatcherThread = new Thread(dispatcher); + dispatcherThread.start(); + + + // create a client connection to the port the dispatcher is listening on + final int realPort = dispatcher.getPort(); + try (SocketChannel channel = SocketChannel.open()) { + channel.connect(new InetSocketAddress("localhost", realPort)); + Thread.sleep(100); + + // send the provided messages + for (int i=0; i < messages.size(); i++) { + buffer.clear(); + buffer.put(DatatypeConverter.parseHexBinary(messages.get(i))); + buffer.flip(); + + while (buffer.hasRemaining()) { + channel.write(buffer); + } + Thread.sleep(1); + } + } + + // wait up to 10 seconds to verify the responses + long timeout = 10000; + long startTime = System.currentTimeMillis(); + while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) { + Thread.sleep(100); + } + + // should have gotten an event for each message sent + Assert.assertEquals(4, events.size()); + + } finally { + // stop the dispatcher thread and ensure we shut down handler threads + dispatcher.close(); + } + } + + // Test event to produce from the data + private static class TestEvent implements Event<SocketChannel> { + + private byte[] data; + private Map<String, String> metadata; + + public TestEvent(byte[] data, Map<String, String> metadata) { + this.data = data; + this.metadata = metadata; + } + + @Override + public String getSender() { + return metadata.get(EventFactory.SENDER_KEY); + } + + @Override + public byte[] getData() { + return data; + } + + @Override + public ChannelResponder<SocketChannel> getResponder() { + return null; + } + } + + // Factory to create test events and send responses for testing + private static class TestEventHolderFactory implements EventFactory<TestEvent> { + + @Override + public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { + return new TestEvent(data, metadata); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml b/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml new file mode 100644 index 0000000..86c25b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-lumberjack-bundle</artifactId> + <version>0.7.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-lumberjack-processors</module> + <module>nifi-lumberjack-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 558dc7c..b2395cd 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -57,6 +57,7 @@ <module>nifi-amqp-bundle</module> <module>nifi-splunk-bundle</module> <module>nifi-jms-bundle</module> + <module>nifi-lumberjack-bundle</module> <module>nifi-cassandra-bundle</module> <module>nifi-spring-bundle</module> <module>nifi-hive-bundle</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3a0a73a..80b4b28 100644 --- a/pom.xml +++ b/pom.xml @@ -1107,6 +1107,12 @@ language governing permissions and limitations under the License. --> <version>0.7.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-lumberjack-nar</artifactId> + <version>0.7.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-splunk-nar</artifactId>
