Repository: nifi Updated Branches: refs/heads/NIFI-964 2382841e7 -> af7b4b166
NIFI-964 Adding unit tests for GetPcap Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af7b4b16 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af7b4b16 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af7b4b16 Branch: refs/heads/NIFI-964 Commit: af7b4b16681bb3809706e8912cc5638cf6642dbc Parents: 2382841 Author: Bryan Bende <[email protected]> Authored: Thu Sep 17 15:31:34 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Thu Sep 17 15:31:34 2015 -0400 ---------------------------------------------------------------------- .../nifi-pcap-processors/pom.xml | 12 +++ .../apache/nifi/processors/pcap/GetPcap.java | 33 +++++-- .../nifi/processors/pcap/TestGetPcap.java | 99 ++++++++++++++++++-- .../java/org/pcap4j/packet/ExceptionPacket.java | 36 +++++++ .../test/java/org/pcap4j/packet/TestPacket.java | 43 +++++++++ 5 files changed, 207 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml index 0907b98..e4498a8 100644 --- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml @@ -60,5 +60,17 @@ <version>4.11</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>1.6.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>1.6.2</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java index 599a2d6..754bcea 100644 --- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java @@ -153,12 +153,12 @@ public class GetPcap extends AbstractProcessor { final Long timePeriod = context.getProperty(READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final int readTimeout = (timePeriod == null ? 0 : timePeriod.intValue()); - final PcapNetworkInterface nif = Pcaps.getDevByName(interfaceName); + final PcapNetworkInterface nif = getDevByName(interfaceName); if (nif == null) { throw new IllegalStateException("Unable to locate Network Interface " + interfaceName); } - final PcapHandle handle = nif.openLive(snapLen, PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout); + final PcapHandle handle = getPcapHandle(mode, snapLen, readTimeout, nif); if (filter != null && filter.length() > 0) { handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE); } @@ -167,12 +167,7 @@ public class GetPcap extends AbstractProcessor { @Override public void run() { try { - handle.loop(-1, new PacketListener() { - @Override - public void gotPacket(Packet packet) { - packetQueue.offer(packet); - } - }); + handle.loop(-1, createPacketListener(packetQueue)); getLogger().info("PcapListener shutting down..."); } catch (PcapNativeException e) { getLogger().error("Error calling pcap native libraries", e); @@ -193,6 +188,24 @@ public class GetPcap extends AbstractProcessor { pcapHandle.set(handle); } + protected PcapNetworkInterface getDevByName(final String interfaceName) throws PcapNativeException { + return Pcaps.getDevByName(interfaceName); + } + + protected PcapHandle getPcapHandle(final String mode, final int snapLen, final int readTimeout, final PcapNetworkInterface nif) + throws PcapNativeException { + return nif.openLive(snapLen, PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout); + } + + protected PacketListener createPacketListener(final BlockingQueue<Packet> packetQueue) { + return new PacketListener() { + @Override + public void gotPacket(Packet packet) { + packetQueue.offer(packet); + } + }; + } + @OnUnscheduled public void onUnscheduled() { final PcapHandle handle = pcapHandle.get(); @@ -221,9 +234,8 @@ public class GetPcap extends AbstractProcessor { return; } + FlowFile flowFile = session.create(); try { - FlowFile flowFile = session.create(); - final Packet.Header header = packet.getHeader(); if (header != null) { flowFile = session.putAttribute(flowFile, PACKET_HEADER_ATTR, packet.getHeader().toString()); @@ -245,6 +257,7 @@ public class GetPcap extends AbstractProcessor { } catch (ProcessException pe) { getLogger().error("Error processing packet", pe); packetQueue.offer(packet); // requeue the packet so we try again + session.remove(flowFile); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java index 752122b..02a6c69 100644 --- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/apache/nifi/processors/pcap/TestGetPcap.java @@ -16,24 +16,111 @@ */ package org.apache.nifi.processors.pcap; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.pcap4j.core.PacketListener; +import org.pcap4j.core.PcapHandle; +import org.pcap4j.core.PcapNativeException; +import org.pcap4j.core.PcapNetworkInterface; +import org.pcap4j.packet.ExceptionPacket; +import org.pcap4j.packet.IllegalRawDataException; +import org.pcap4j.packet.Packet; +import org.pcap4j.packet.TestPacket; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({PcapNetworkInterface.class,PcapHandle.class}) public class TestGetPcap { - private TestRunner testRunner; + @Test + public void testPacketPerFlowFile() throws UnsupportedEncodingException, IllegalRawDataException { + final Packet packet1 = new TestPacket("payload data1".getBytes("UTF-8"), 0, 13); + final Packet packet2 = new TestPacket("payload data2".getBytes("UTF-8"), 0, 13); + + final List<Packet> packets = new ArrayList<>(); + packets.add(packet1); + packets.add(packet2); + + final TestableProcessor processor = new TestableProcessor(packets); + final TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(GetPcap.INTERFACE_NAME, "lo0"); - @Before - public void init() { - testRunner = TestRunners.newTestRunner(GetPcap.class); + testRunner.run(2); + testRunner.assertAllFlowFilesTransferred(GetPcap.SUCCESS, 2); + + final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(GetPcap.SUCCESS); + final MockFlowFile flowFile1 = flowFiles.get(0); + final MockFlowFile flowFile2 = flowFiles.get(1); + assertEquals(new String(packet1.getRawData()), new String(flowFile1.toByteArray())); + assertEquals(new String(packet2.getRawData()), new String(flowFile2.toByteArray())); + assertEquals(0, processor.getPacketQueueSize()); } @Test - public void testProcessor() { + public void testRequeueOnError() throws UnsupportedEncodingException { + final Packet packet1 = new ExceptionPacket("payload data1".getBytes("UTF-8"), 0, 13); + + final List<Packet> packets = new ArrayList<>(); + packets.add(packet1); + + final TestableProcessor processor = new TestableProcessor(packets); + final TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(GetPcap.INTERFACE_NAME, "lo0"); + + testRunner.run(); + testRunner.assertTransferCount(GetPcap.SUCCESS, 0); + assertEquals(1, processor.getPacketQueueSize()); + } + + // Use PowerMock to mock pcap4j classes that are final, and inject provided packets into the queue + private class TestableProcessor extends GetPcap { + + final List<Packet> packets; + BlockingQueue<Packet> packetQueue; + + public TestableProcessor(final List<Packet> packets) { + this.packets = packets; + } + + @Override + protected PacketListener createPacketListener(final BlockingQueue<Packet> packetQueue) { + this.packetQueue = packetQueue; + for (Packet packet : packets) { + this.packetQueue.offer(packet); + } + return super.createPacketListener(packetQueue); + } + + @Override + protected PcapHandle getPcapHandle(String mode, int snapLen, int readTimeout, PcapNetworkInterface nif) + throws PcapNativeException { + return PowerMockito.mock(PcapHandle.class); + } + + @Override + protected PcapNetworkInterface getDevByName(String interfaceName) throws PcapNativeException { + return PowerMockito.mock(PcapNetworkInterface.class); + } + public int getPacketQueueSize() { + if (this.packetQueue == null) { + return 0; + } else { + return this.packetQueue.size(); + } + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java new file mode 100644 index 0000000..e77ef4d --- /dev/null +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/ExceptionPacket.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.pcap4j.packet; + +import org.apache.nifi.processor.exception.ProcessException; + +public class ExceptionPacket extends TestPacket { + + public ExceptionPacket(byte[] rawData, int offset, int length) { + super(rawData, offset, length); + } + + @Override + public Packet getPayload() { + return new TestPacket(this.getRawData(), 0, this.getRawData().length) { + @Override + public byte[] getRawData() { + throw new ProcessException("test exception"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/af7b4b16/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java new file mode 100644 index 0000000..3580dbf --- /dev/null +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/test/java/org/pcap4j/packet/TestPacket.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.pcap4j.packet; + +/** + * A test Packet implementation that makes SimplePacket public. + */ +public class TestPacket extends SimplePacket { + + public TestPacket(byte[] rawData, int offset, int length) { + super(rawData, offset, length); + } + + @Override + protected String modifier() { + return null; + } + + @Override + public Packet.Builder getBuilder() { + return null; + } + + @Override + public Packet getPayload() { + return new TestPacket(this.getRawData(), 0, this.getRawData().length); + } + +}
