Repository: nifi
Updated Branches:
  refs/heads/0.x d01f2d910 -> 258fa660d


http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java
new file mode 100644
index 0000000..2ace367
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/response/LumberjackResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.response;
+
+import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
+
+import java.nio.ByteBuffer;
+
+/**
+ 'ack' frame type
+
+ SENT FROM READER ONLY
+ frame type value: ASCII 'A' aka byte value 0x41
+
+ Payload:
+ 32bit unsigned sequence number.
+
+ */
+public class LumberjackResponse {
+    private final long seqNumber;
+    final private byte version = 0x31, frameType = 0x41;
+
+
+
+    public LumberjackResponse(final long seqNumber) {
+        this.seqNumber = seqNumber;
+    }
+
+    /**
+     * Creates a LumberjackFrame where the data portion will contain this 
response.
+     *
+     *
+     * @return a LumberjackFrame for for this response
+     */
+    public LumberjackFrame toFrame() {
+
+        return new LumberjackFrame.Builder()
+                .version(version)
+                .frameType(frameType)
+                .payload(ByteBuffer.allocate(8).putLong(seqNumber).array())
+                .build();
+    }
+
+    public static LumberjackResponse ok(final long seqNumber) {
+        return new LumberjackResponse(seqNumber);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..3c23391
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.lumberjack.ListenLumberjack

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java
new file mode 100644
index 0000000..7b239bb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/event/TestLumberjackEventFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.lumberjack.event;
+
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import 
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestLumberjackEventFactory {
+
+    @Test
+    public void testCreateLumberJackEvent() {
+        final String sender = "testsender1";
+        final byte[] data = "this is a test line".getBytes();
+        final long seqNumber = 1;
+        final String fields = "{\"file\":\"test\"}";
+
+
+        final Map<String,String> metadata = new HashMap<>();
+        metadata.put(EventFactory.SENDER_KEY, sender);
+        metadata.put(LumberjackMetadata.SEQNUMBER_KEY, 
String.valueOf(seqNumber));
+        metadata.put(LumberjackMetadata.FIELDS_KEY, String.valueOf(fields));
+
+        final ChannelResponder responder = new SocketChannelResponder(null);
+
+        final EventFactory<LumberjackEvent> factory = new 
LumberjackEventFactory();
+
+        final LumberjackEvent event = factory.create(data, metadata, 
responder);
+
+        Assert.assertEquals(sender, event.getSender());
+        Assert.assertEquals(seqNumber, event.getSeqNumber());
+        Assert.assertEquals(fields, event.getFields());
+        Assert.assertEquals(data, event.getData());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java
new file mode 100644
index 0000000..f7b42e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackDecoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.frame;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestLumberjackDecoder {
+
+    // Because no encoder for type 43 was coded, added Static hex
+    // representation of compressed data
+    //
+    private static final String singleFrameData = 
"3143000000aa785e4c8e4daac3300c8413c8cbfeddc017681da7b48540775df51245103936f54fb"
 +
+        
"04c4a6e5f6917d03020e91bc93c9ba669597faccefa80ec0fed72440dd1174833e819370c798d98aa0e79a10ae44e36972f94198b26886bc"
 +
+        
"0774422589024c865aaecff07f24c6e1b0c37fb6c2da18cdb4176834f72747c4152e6aa46330db7e9725707567db0240c93aace93e212464"
 +
+        "95857f755e89e76e2d77e000000ffff010000ffff05b43bb8";
+    private static final String multiFrameData = 
"3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728031957a97f82"
 +
+        
"232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f021f71461b26873e711bee9480f48b0af10fe28"
 +
+        
"89113b8c9e28f4322b82395413a50cafd79957c253d0b992faf4129c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f468079"
 +
+        
"6b421964fc9b032ac4dcb54d2575a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff"
 +
+        "010000ffff35e0eff0";
+    private static final String payload = "";
+
+    private LumberjackDecoder decoder;
+
+    @Before
+    public void setup() {
+        this.decoder = new LumberjackDecoder(StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testDecodeSingleFrame() {
+        final byte[] input = DatatypeConverter.parseHexBinary(singleFrameData);
+
+        List<LumberjackFrame> frames = null;
+        LumberjackFrame frame = null;
+
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frames = decoder.getFrames();
+                break;
+            }
+        }
+
+        frame = frames.get(frames.size() - 1);
+
+        Assert.assertNotNull(frame);
+        Assert.assertEquals(0x31, frame.getVersion());
+        Assert.assertEquals(0x44, frame.getFrameType());
+        Assert.assertEquals(1, frame.getSeqNumber());
+        // Look for a predefined number of bytes for matching of the inner 
payload
+        
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"),
 Arrays.copyOfRange(frame.getPayload(), 0, 15));
+    }
+
+    @Test
+    public void testDecodeMultipleFrame() {
+        final byte[] input = DatatypeConverter.parseHexBinary(multiFrameData);
+
+        List<LumberjackFrame> frames = null;
+        LumberjackFrame frame = null;
+
+        for (byte b : input) {
+            if (decoder.process(b)) {
+                frames = decoder.getFrames();
+                break;
+            }
+        }
+
+        frame = frames.get(1);
+
+        Assert.assertNotNull(frame);
+        Assert.assertEquals(0x31, frame.getVersion());
+        Assert.assertEquals(0x44, frame.getFrameType());
+        // Load the second frame therefore seqNumber = 2
+        Assert.assertEquals(2, frame.getSeqNumber());
+        // Look for a predefined number of bytes for matching of the inner 
payload
+        
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("000000050000000466696c65000000"),
 Arrays.copyOfRange(frame.getPayload(), 0, 15));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java
new file mode 100644
index 0000000..aa33fe0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackEncoder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.frame;
+
+import java.nio.ByteBuffer;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestLumberjackEncoder {
+    private LumberjackEncoder encoder;
+
+
+    @Before
+    public void setup() {
+        this.encoder = new LumberjackEncoder();
+    }
+
+    @Test
+    public void testEncode() {
+        LumberjackFrame frame = new LumberjackFrame.Builder()
+            .version((byte) 0x31)
+            .frameType((byte) 0x41)
+            .payload(ByteBuffer.allocate(8).putLong(123).array())
+            .build();
+
+        byte[] encoded = encoder.encode(frame);
+
+        
Assert.assertArrayEquals(DatatypeConverter.parseHexBinary("3141000000000000007B"),
 encoded);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java
new file mode 100644
index 0000000..44fecb6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/frame/TestLumberjackFrame.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.frame;
+
+import org.junit.Test;
+
+
+public class TestLumberjackFrame {
+
+    @Test(expected = LumberjackFrameException.class)
+    public void testInvalidVersion() {
+        new LumberjackFrame.Builder().seqNumber(1234).dataSize(3).build();
+    }
+
+    @Test(expected = LumberjackFrameException.class)
+    public void testInvalidFrameType() {
+        new LumberjackFrame.Builder().frameType((byte) 
0x70).dataSize(5).build();
+    }
+
+    @Test(expected = LumberjackFrameException.class)
+    public void testBlankFrameType() {
+        new LumberjackFrame.Builder().frameType(((byte) 
0x00)).dataSize(5).build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java
new file mode 100644
index 0000000..253fa18
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackFrameHandler.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.handler;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
+import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
+import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
+import org.apache.nifi.processors.lumberjack.frame.LumberjackFrame;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestLumberjackFrameHandler {
+    private Charset charset;
+    private EventFactory<LumberjackEvent> eventFactory;
+    private BlockingQueue<LumberjackEvent> events;
+    private SelectionKey key;
+    private AsyncChannelDispatcher dispatcher;
+    private LumberjackEncoder encoder;
+
+    private ProcessorLog logger;
+
+    private LumberjackFrameHandler<LumberjackEvent> frameHandler;
+
+    @Before
+    public void setup() {
+        this.charset = StandardCharsets.UTF_8;
+        this.eventFactory = new LumberjackEventFactory();
+        this.events = new LinkedBlockingQueue<>();
+        this.key = Mockito.mock(SelectionKey.class);
+        this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
+        this.logger = Mockito.mock(ProcessorLog.class);
+
+        this.frameHandler = new LumberjackFrameHandler<>(key, charset, 
eventFactory, events, dispatcher, logger);
+    }
+
+    @Test
+    public void testWindow() throws IOException, InterruptedException {
+        final LumberjackFrame openFrame = new LumberjackFrame.Builder()
+            .version((byte) 0x31)
+            .frameType((byte) 0x57)
+            .seqNumber(-1)
+            .payload(Integer.toString(1).getBytes())
+            .build();
+
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new 
CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once 
response
+        frameHandler.handle(openFrame, responder, sender);
+
+        // No response expected
+        Assert.assertEquals(0, responder.responded);
+    }
+
+
+    @Test
+    public void testData() throws IOException, InterruptedException {
+        final byte payload[] = new byte[]{
+            0x00, 0x00, 0x00, 0x02,  // Number of pairs
+            0x00, 0x00, 0x00, 0x04,  // Length of first pair key ('line')
+            0x6C, 0x69, 0x6E, 0x65, // 'line'
+            0x00, 0x00, 0x00, 0x0C, // Lenght of 'test-content'
+            0x74, 0x65, 0x73, 0x74, //
+            0x2d, 0x63, 0x6f, 0x6e, // 'test-content'
+            0x74, 0x65, 0x6e, 0x74, //
+            0x00, 0x00, 0x00, 0x05, // Length of 2nd pair key (field)
+            0x66, 0x69, 0x65, 0x6c, //
+            0x64,                               // 'field'
+            0x00, 0x00, 0x00, 0x05, // Length of 'value'
+            0x76, 0x61, 0x6c, 0x75, // 'value'
+            0x65
+        };
+
+        final LumberjackFrame dataFrame = new LumberjackFrame.Builder()
+            .version((byte) 0x31)
+            .frameType((byte) 0x44)
+            .seqNumber(1)
+            // Payload eq { enil: hello }
+            .payload(payload)
+            .build();
+
+
+        final String sender = "sender1";
+        final CapturingChannelResponder responder = new 
CapturingChannelResponder();
+
+        // call the handler and verify respond() was called once with once 
response
+        frameHandler.handle(dataFrame, responder, sender);
+
+        // No response expected
+        Assert.assertEquals(0, responder.responded);
+        // But events should contain one event
+        Assert.assertEquals(1, events.size());
+
+        final LumberjackEvent event = events.poll();
+        Assert.assertEquals("{\"field\":\"value\"}", new 
String((event.getFields())));
+        Assert.assertEquals("test-content", new String(event.getData(), 
charset));
+    }
+
+    private static class CapturingChannelResponder implements 
ChannelResponder<SocketChannel> {
+
+        int responded;
+        List<ChannelResponse> responses = new ArrayList<>();
+
+        @Override
+        public SocketChannel getChannel() {
+            return Mockito.mock(SocketChannel.class);
+        }
+
+        @Override
+        public List<ChannelResponse> getResponses() {
+            return responses;
+        }
+
+        @Override
+        public void addResponse(ChannelResponse response) {
+            responses.add(response);
+        }
+
+        @Override
+        public void respond() throws IOException {
+            responded++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
new file mode 100644
index 0000000..c29ba65
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/TestLumberjackSocketChannelHandler.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.lumberjack.handler;
+
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processors.lumberjack.event.LumberjackMetadata;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.net.ssl.SSLContext;
+import javax.xml.bind.DatatypeConverter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+
+
+public class TestLumberjackSocketChannelHandler {
+    private EventFactory<TestEvent> eventFactory;
+    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> 
channelHandlerFactory;
+    private BlockingQueue<ByteBuffer> byteBuffers;
+    private BlockingQueue<TestEvent> events;
+    private ProcessorLog logger = Mockito.mock(ProcessorLog.class);
+    private int maxConnections;
+    private SSLContext sslContext;
+    private Charset charset;
+    private ChannelDispatcher dispatcher;
+
+    @Before
+    public void setup() {
+        eventFactory = new TestEventHolderFactory();
+        channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
+
+        byteBuffers = new LinkedBlockingQueue<>();
+        byteBuffers.add(ByteBuffer.allocate(4096));
+
+        events = new LinkedBlockingQueue<>();
+        logger = Mockito.mock(ProcessorLog.class);
+
+        maxConnections = 1;
+        sslContext = null;
+        charset = StandardCharsets.UTF_8;
+
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBuffers, events, logger,
+                maxConnections, sslContext, charset);
+
+    }
+
+    @Test
+    public void testBasicHandling() throws IOException, InterruptedException {
+        final String multiFrameData = 
"3143000000d7785ec48fcf6ac4201087b3bbe9defb06be40ab669b1602bdf5d49728" +
+                
"031957a97f82232979fbaaa7c0924b2e018701f537f37df2ab699a53aea75cad321673ffe43a38e4e04c043f02"
 +
+                
"1f71461b26873e711bee9480f48b0af10fe2889113b8c9e28f4322b82395413a50cafd79957c253d0b992faf41"
 +
+                
"29c2f27c12e5af35be2cedbec133d9b34e0ee27db87db05596fd62f4680796b421964fc9b032ac4dcb54d2575"
 +
+                
"a28a3559df3413ae7be12edf6e9367c2e07f95ca4a848bb856e1b42ed61427d45da2df4f628f40f0000ffff01000"
 +
+                "0ffff35e0eff0";
+        final List<String> messages = new ArrayList<>();
+        messages.add(multiFrameData);
+
+        run(messages);
+
+        // Check for the 4 frames (from the hex string above) are back...
+        Assert.assertEquals(4, events.size());
+
+        boolean found1 = false;
+        boolean found2 = false;
+        boolean found3 = false;
+        boolean found4 = false;
+
+        TestEvent event;
+        while((event = events.poll()) != null) {
+            Map<String,String> metadata = event.metadata;
+            
Assert.assertTrue(metadata.containsKey(LumberjackMetadata.SEQNUMBER_KEY));
+
+            final String seqNum = 
metadata.get(LumberjackMetadata.SEQNUMBER_KEY);
+            if (seqNum.equals("1")) {
+                found1 = true;
+            } else if (seqNum.equals("2")) {
+                found2 = true;
+            } else if (seqNum.equals("3")) {
+                found3 = true;
+            } else if (seqNum.equals("4")) {
+                found4 = true;
+            }
+        }
+
+        Assert.assertTrue(found1);
+        Assert.assertTrue(found2);
+        Assert.assertTrue(found3);
+        Assert.assertTrue(found4);
+    }
+
+    protected void run(List<String> messages) throws IOException, 
InterruptedException {
+        final ByteBuffer buffer = ByteBuffer.allocate(1024);
+        try {
+            // starts the dispatcher listening on port 0 so it selects a 
random port
+            dispatcher.open(null, 0, 4096);
+
+            // starts a thread to run the dispatcher which will accept/read 
connections
+            Thread dispatcherThread = new Thread(dispatcher);
+            dispatcherThread.start();
+
+
+            // create a client connection to the port the dispatcher is 
listening on
+            final int realPort = dispatcher.getPort();
+            try (SocketChannel channel = SocketChannel.open()) {
+                channel.connect(new InetSocketAddress("localhost", realPort));
+                Thread.sleep(100);
+
+                // send the provided messages
+                for (int i=0; i < messages.size(); i++) {
+                    buffer.clear();
+                    
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
+                    buffer.flip();
+
+                    while (buffer.hasRemaining()) {
+                        channel.write(buffer);
+                    }
+                    Thread.sleep(1);
+                }
+            }
+
+            // wait up to 10 seconds to verify the responses
+            long timeout = 10000;
+            long startTime = System.currentTimeMillis();
+            while (events.size() < messages.size() && 
(System.currentTimeMillis() - startTime < timeout)) {
+                Thread.sleep(100);
+            }
+
+            // should have gotten an event for each message sent
+            Assert.assertEquals(4, events.size());
+
+        } finally {
+            // stop the dispatcher thread and ensure we shut down handler 
threads
+            dispatcher.close();
+        }
+    }
+
+    // Test event to produce from the data
+    private static class TestEvent implements Event<SocketChannel> {
+
+        private byte[] data;
+        private Map<String, String> metadata;
+
+        public TestEvent(byte[] data, Map<String, String> metadata) {
+            this.data = data;
+            this.metadata = metadata;
+        }
+
+        @Override
+        public String getSender() {
+            return metadata.get(EventFactory.SENDER_KEY);
+        }
+
+        @Override
+        public byte[] getData() {
+            return data;
+        }
+
+        @Override
+        public ChannelResponder<SocketChannel> getResponder() {
+            return null;
+        }
+    }
+
+    // Factory to create test events and send responses for testing
+    private static class TestEventHolderFactory implements 
EventFactory<TestEvent> {
+
+        @Override
+        public TestEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
+            return new TestEvent(data, metadata);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml 
b/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml
new file mode 100644
index 0000000..86c25b3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-lumberjack-bundle/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-lumberjack-bundle</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-lumberjack-processors</module>
+        <module>nifi-lumberjack-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 558dc7c..b2395cd 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -57,6 +57,7 @@
         <module>nifi-amqp-bundle</module>
            <module>nifi-splunk-bundle</module>
         <module>nifi-jms-bundle</module>
+        <module>nifi-lumberjack-bundle</module>
         <module>nifi-cassandra-bundle</module>
         <module>nifi-spring-bundle</module>
         <module>nifi-hive-bundle</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/258fa660/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3a0a73a..80b4b28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1107,6 +1107,12 @@ language governing permissions and limitations under the 
License. -->
                 <version>0.7.0-SNAPSHOT</version>
                <type>nar</type>
            </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lumberjack-nar</artifactId>
+            <version>0.7.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-splunk-nar</artifactId>

Reply via email to