Repository: nifi
Updated Branches:
  refs/heads/NIFI-964 2382841e7 -> af7b4b166


NIFI-964 Adding unit tests for GetPcap


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af7b4b16
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af7b4b16
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af7b4b16

Branch: refs/heads/NIFI-964
Commit: af7b4b16681bb3809706e8912cc5638cf6642dbc
Parents: 2382841
Author: Bryan Bende <[email protected]>
Authored: Thu Sep 17 15:31:34 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Thu Sep 17 15:31:34 2015 -0400

----------------------------------------------------------------------
 .../nifi-pcap-processors/pom.xml                | 12 +++
 .../apache/nifi/processors/pcap/GetPcap.java    | 33 +++++--
 .../nifi/processors/pcap/TestGetPcap.java       | 99 ++++++++++++++++++--
 .../java/org/pcap4j/packet/ExceptionPacket.java | 36 +++++++
 .../test/java/org/pcap4j/packet/TestPacket.java | 43 +++++++++
 5 files changed, 207 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
index 0907b98..e4498a8 100644
--- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
@@ -60,5 +60,17 @@
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
index 599a2d6..754bcea 100644
--- 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
+++ 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
@@ -153,12 +153,12 @@ public class GetPcap extends AbstractProcessor {
         final Long timePeriod = 
context.getProperty(READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
         final int readTimeout = (timePeriod == null ? 0 : 
timePeriod.intValue());
 
-        final PcapNetworkInterface nif = Pcaps.getDevByName(interfaceName);
+        final PcapNetworkInterface nif = getDevByName(interfaceName);
         if (nif == null) {
             throw new IllegalStateException("Unable to locate Network 
Interface " + interfaceName);
         }
 
-        final PcapHandle handle = nif.openLive(snapLen, 
PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout);
+        final PcapHandle handle = getPcapHandle(mode, snapLen, readTimeout, 
nif);
         if (filter != null && filter.length() > 0) {
             handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE);
         }
@@ -167,12 +167,7 @@ public class GetPcap extends AbstractProcessor {
             @Override
             public void run() {
                 try {
-                    handle.loop(-1, new PacketListener() {
-                        @Override
-                        public void gotPacket(Packet packet) {
-                            packetQueue.offer(packet);
-                        }
-                    });
+                    handle.loop(-1, createPacketListener(packetQueue));
                     getLogger().info("PcapListener shutting down...");
                 } catch (PcapNativeException e) {
                     getLogger().error("Error calling pcap native libraries", 
e);
@@ -193,6 +188,24 @@ public class GetPcap extends AbstractProcessor {
         pcapHandle.set(handle);
     }
 
+    protected PcapNetworkInterface getDevByName(final String interfaceName) 
throws PcapNativeException {
+        return Pcaps.getDevByName(interfaceName);
+    }
+
+    protected PcapHandle getPcapHandle(final String mode, final int snapLen, 
final int readTimeout, final PcapNetworkInterface nif)
+            throws PcapNativeException {
+        return nif.openLive(snapLen, 
PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout);
+    }
+
+    protected PacketListener createPacketListener(final BlockingQueue<Packet> 
packetQueue) {
+        return new PacketListener() {
+            @Override
+            public void gotPacket(Packet packet) {
+                packetQueue.offer(packet);
+            }
+        };
+    }
+
     @OnUnscheduled
     public void onUnscheduled() {
         final PcapHandle handle = pcapHandle.get();
@@ -221,9 +234,8 @@ public class GetPcap extends AbstractProcessor {
             return;
         }
 
+        FlowFile flowFile = session.create();
         try {
-            FlowFile flowFile = session.create();
-
             final Packet.Header header = packet.getHeader();
             if (header != null) {
                 flowFile = session.putAttribute(flowFile, PACKET_HEADER_ATTR, 
packet.getHeader().toString());
@@ -245,6 +257,7 @@ public class GetPcap extends AbstractProcessor {
         } catch (ProcessException pe) {
             getLogger().error("Error processing packet", pe);
             packetQueue.offer(packet); // requeue the packet so we try again
+            session.remove(flowFile);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java
 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java
index 752122b..02a6c69 100644
--- 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java
+++ 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java
@@ -16,24 +16,111 @@
  */
 package org.apache.nifi.processors.pcap;
 
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.pcap4j.core.PacketListener;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.packet.ExceptionPacket;
+import org.pcap4j.packet.IllegalRawDataException;
+import org.pcap4j.packet.Packet;
+import org.pcap4j.packet.TestPacket;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
 
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({PcapNetworkInterface.class,PcapHandle.class})
 public class TestGetPcap {
 
-    private TestRunner testRunner;
+    @Test
+    public void testPacketPerFlowFile() throws UnsupportedEncodingException, 
IllegalRawDataException {
+        final Packet packet1 = new TestPacket("payload 
data1".getBytes("UTF-8"), 0, 13);
+        final Packet packet2 = new TestPacket("payload 
data2".getBytes("UTF-8"), 0, 13);
+
+        final List<Packet> packets = new ArrayList<>();
+        packets.add(packet1);
+        packets.add(packet2);
+
+        final TestableProcessor processor = new TestableProcessor(packets);
+        final TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(GetPcap.INTERFACE_NAME, "lo0");
 
-    @Before
-    public void init() {
-        testRunner = TestRunners.newTestRunner(GetPcap.class);
+        testRunner.run(2);
+        testRunner.assertAllFlowFilesTransferred(GetPcap.SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(GetPcap.SUCCESS);
+        final MockFlowFile flowFile1 = flowFiles.get(0);
+        final MockFlowFile flowFile2 = flowFiles.get(1);
+        assertEquals(new String(packet1.getRawData()), new 
String(flowFile1.toByteArray()));
+        assertEquals(new String(packet2.getRawData()), new 
String(flowFile2.toByteArray()));
+        assertEquals(0, processor.getPacketQueueSize());
     }
 
     @Test
-    public void testProcessor() {
+    public void testRequeueOnError() throws UnsupportedEncodingException {
+        final Packet packet1 = new ExceptionPacket("payload 
data1".getBytes("UTF-8"), 0, 13);
+
+        final List<Packet> packets = new ArrayList<>();
+        packets.add(packet1);
+
+        final TestableProcessor processor = new TestableProcessor(packets);
+        final TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(GetPcap.INTERFACE_NAME, "lo0");
+
+        testRunner.run();
+        testRunner.assertTransferCount(GetPcap.SUCCESS, 0);
+        assertEquals(1, processor.getPacketQueueSize());
+    }
+
+    // Use PowerMock to mock pcap4j classes that are final, and inject 
provided packets into the queue
+    private class TestableProcessor extends GetPcap {
+
+        final List<Packet> packets;
+        BlockingQueue<Packet> packetQueue;
+
+        public TestableProcessor(final List<Packet> packets) {
+            this.packets = packets;
+        }
+
+        @Override
+        protected PacketListener createPacketListener(final 
BlockingQueue<Packet> packetQueue) {
+            this.packetQueue = packetQueue;
+            for (Packet packet : packets) {
+                this.packetQueue.offer(packet);
+            }
+            return super.createPacketListener(packetQueue);
+        }
+
+        @Override
+        protected PcapHandle getPcapHandle(String mode, int snapLen, int 
readTimeout, PcapNetworkInterface nif)
+                throws PcapNativeException {
+            return PowerMockito.mock(PcapHandle.class);
+        }
+
+        @Override
+        protected PcapNetworkInterface getDevByName(String interfaceName) 
throws PcapNativeException {
+            return PowerMockito.mock(PcapNetworkInterface.class);
+        }
 
+        public int getPacketQueueSize() {
+            if (this.packetQueue == null) {
+                return 0;
+            } else {
+                return this.packetQueue.size();
+            }
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java
 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java
new file mode 100644
index 0000000..e77ef4d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.pcap4j.packet;
+
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class ExceptionPacket extends TestPacket {
+
+    public ExceptionPacket(byte[] rawData, int offset, int length) {
+        super(rawData, offset, length);
+    }
+
+    @Override
+    public Packet getPayload() {
+       return new TestPacket(this.getRawData(), 0, this.getRawData().length) {
+           @Override
+           public byte[] getRawData() {
+               throw new ProcessException("test exception");
+           }
+       };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java
 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java
new file mode 100644
index 0000000..3580dbf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.pcap4j.packet;
+
+/**
+ * A test Packet implementation that makes SimplePacket public.
+ */
+public class TestPacket extends SimplePacket {
+
+    public TestPacket(byte[] rawData, int offset, int length) {
+       super(rawData, offset, length);
+    }
+
+    @Override
+    protected String modifier() {
+        return null;
+    }
+
+    @Override
+    public Packet.Builder getBuilder() {
+        return null;
+    }
+
+    @Override
+    public Packet getPayload() {
+        return new TestPacket(this.getRawData(), 0, this.getRawData().length);
+    }
+
+}

Reply via email to