Repository: nifi
Updated Branches:
  refs/heads/NIFI-964 4abdf6a89 -> 00375ced2


NIFI-964 Initial implementation of GetPcap processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/00375ced
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/00375ced
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/00375ced

Branch: refs/heads/NIFI-964
Commit: 00375ced22a75c73a1a9f601295927346a087a43
Parents: 4abdf6a
Author: Bryan Bende <[email protected]>
Authored: Wed Sep 16 15:12:32 2015 -0400
Committer: Bryan Bende <[email protected]>
Committed: Wed Sep 16 15:12:32 2015 -0400

----------------------------------------------------------------------
 .../nifi-pcap-processors/pom.xml                |  10 +
 .../apache/nifi/processors/pcap/GetPcap.java    | 210 ++++++++++++++++---
 2 files changed, 188 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/00375ced/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
index 6ac6f2e..0907b98 100644
--- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml
@@ -27,6 +27,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.pcap4j</groupId>
+            <artifactId>pcap4j-core</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.pcap4j</groupId>
+            <artifactId>pcap4j-packetfactory-static</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/00375ced/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
index f63779a..7a73dbb 100644
--- 
a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
+++ 
b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java
@@ -16,54 +16,120 @@
  */
 package org.apache.nifi.processors.pcap;
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.*;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.*;
-
-@Tags({"example"})
-@CapabilityDescription("Provide a description")
-@SeeAlso({})
-@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
-@WritesAttributes({@WritesAttribute(attribute="", description="")})
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.pcap4j.core.BpfProgram;
+import org.pcap4j.core.NotOpenException;
+import org.pcap4j.core.PacketListener;
+import org.pcap4j.core.PcapHandle;
+import org.pcap4j.core.PcapNativeException;
+import org.pcap4j.core.PcapNetworkInterface;
+import org.pcap4j.core.Pcaps;
+import org.pcap4j.packet.Packet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"pcap", "packet", "network"})
+@CapabilityDescription("Uses the pcap4j library to capture packets for a given 
interface. Each packet is emitted as " +
+        "a single FlowFile with the payload of the packet as the FlowFile 
content.")
+@WritesAttributes({@WritesAttribute(attribute="packet.header", 
description="The header of the given packet")})
 public class GetPcap extends AbstractProcessor {
 
-    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
-            .Builder().name("My Property")
-            .description("Example Property")
+    public static final PropertyDescriptor INTERFACE_NAME = new 
PropertyDescriptor
+            .Builder().name("Interface Name")
+            .description("The name of the interface to capture from (ex: en0)")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final Relationship MY_RELATIONSHIP = new 
Relationship.Builder()
-            .name("my_relationship")
-            .description("Example relationship")
+    public static final PropertyDescriptor BPF_EXPRESSION = new 
PropertyDescriptor
+            .Builder().name("BPF Expression")
+            .description("A Berkeley Packet Filter expression used to filter 
packets (ex: icmp)")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    private List<PropertyDescriptor> descriptors;
+    public static final PropertyDescriptor SNAPSHOT_LENGTH = new 
PropertyDescriptor
+            .Builder().name("Snapshot Length")
+            .description("The amount of data in bytes to capture for each 
packet.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("65536")
+            .build();
+
+    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name("Read Timeout")
+            .description("The read timeout in milliseconds. Must be 
non-negative. May be ignored by some OSs. " +
+                    "0 means disable buffering on Solaris. 0 means infinite on 
the other OSs. 1 through 9 means infinite on Solaris.")
+            .required(true)
+            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
 
+    public static final AllowableValue PROMISCUOUS_MODE_VALUE = new 
AllowableValue("PROMISCUOUS", "PROMISCUOUS");
+    public static final AllowableValue NON_PROMISCUOUS_MODE_VALUE = new 
AllowableValue("NONPROMISCUOUS", "NONPROMISCUOUS");
+
+    public static final PropertyDescriptor MODE = new PropertyDescriptor
+            .Builder().name("Mode")
+            .description("The capture mode.")
+            .required(true)
+            .allowableValues(PROMISCUOUS_MODE_VALUE, 
NON_PROMISCUOUS_MODE_VALUE)
+            .defaultValue(PROMISCUOUS_MODE_VALUE.getValue())
+            .build();
+
+    public static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successful captures are route out this relationship")
+            .build();
+
+    public static final String PACKET_HEADER_ATTR = "packet.header";
+
+    private List<PropertyDescriptor> descriptors;
     private Set<Relationship> relationships;
 
+    private AtomicReference<PcapNetworkInterface> pcapNetworkInterface = new 
AtomicReference<>(null);
+    private AtomicReference<PcapHandle> pcapHandle = new 
AtomicReference<>(null);
+    private final BlockingQueue<Packet> packetQueue = new 
LinkedBlockingQueue<>(100);
+
     @Override
     protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.add(MY_PROPERTY);
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(INTERFACE_NAME);
+        descriptors.add(BPF_EXPRESSION);
+        descriptors.add(SNAPSHOT_LENGTH);
+        descriptors.add(READ_TIMEOUT);
+        descriptors.add(MODE);
         this.descriptors = Collections.unmodifiableList(descriptors);
 
-        final Set<Relationship> relationships = new HashSet<Relationship>();
-        relationships.add(MY_RELATIONSHIP);
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(SUCCESS);
         this.relationships = Collections.unmodifiableSet(relationships);
     }
 
@@ -78,19 +144,99 @@ public class GetPcap extends AbstractProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) {
+    public void onScheduled(final ProcessContext context) throws 
PcapNativeException, NotOpenException {
+        final String interfaceName = 
context.getProperty(INTERFACE_NAME).getValue();
+        final String filter = context.getProperty(BPF_EXPRESSION).getValue();
+        final String mode = context.getProperty(MODE).getValue();
+        final int snapLen = context.getProperty(SNAPSHOT_LENGTH).asInteger();
+        final int readTimeout = context.getProperty(READ_TIMEOUT).asInteger();
+
+        final PcapNetworkInterface nif = Pcaps.getDevByName(interfaceName);
+        if (nif == null) {
+            throw new IllegalStateException("Unable to locate Network 
Interface " + interfaceName);
+        }
+
+        final PcapHandle handle = nif.openLive(snapLen, 
PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout);
+        if (filter != null && filter.length() > 0) {
+            handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE);
+        }
+
+        final Thread listenerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    handle.loop(-1, new PacketListener() {
+                        @Override
+                        public void gotPacket(Packet packet) {
+                            packetQueue.offer(packet);
+                        }
+                    });
+                    getLogger().info("PcapListener shutting down...");
+                } catch (PcapNativeException e) {
+                    getLogger().error("Error calling pcap native libraries", 
e);
+                } catch (NotOpenException e) {
+                    getLogger().error("Pcap handle was not open", e);
+                } catch (InterruptedException e) {
+                    getLogger().info("PcapListener interrupted");
+                }
+            }
+        });
+        listenerThread.setDaemon(true);
+        listenerThread.setName("PcapListener");
+        listenerThread.start();
+
+        pcapNetworkInterface.set(nif);
+        pcapHandle.set(handle);
+    }
 
+    @OnUnscheduled
+    public void onUnscheduled() {
+        final PcapHandle handle = pcapHandle.get();
+        if (handle != null) {
+            try {
+                getLogger().debug("Breaking pcap loop...");
+                handle.breakLoop();
+            } catch (NotOpenException e) {
+                getLogger().warn("Pcap handle was not open: {}", new 
Object[]{e.getMessage()});
+            } finally {
+                getLogger().debug("Closing pcap Handle...");
+                handle.close();
+            }
+        }
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-               FlowFile flowFile = session.get();
-               if ( flowFile == null ) {
-                       return;
-               }
-
-        // TODO implement
+    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+        packetQueue.clear(); // clear any queued packets as they may no longer 
be valid after properties have been changed
+    }
 
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+               final Packet packet = packetQueue.poll();
+        if (packet == null) {
+            return;
+        }
+
+        try {
+            FlowFile flowFile = session.create();
+            flowFile = session.putAttribute(flowFile, PACKET_HEADER_ATTR, 
packet.getHeader().toString());
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream rawOut) throws IOException {
+                    try (final BufferedOutputStream out = new 
BufferedOutputStream(rawOut)) {
+                        out.write(packet.getRawData());
+                        out.flush();
+                    }
+                }
+            });
+
+            getLogger().info("Transferring FlowFile {} to success", new 
Object[]{flowFile});
+            session.transfer(flowFile, SUCCESS);
+        } catch (ProcessException pe) {
+            getLogger().error("Error processing packet", pe);
+            packetQueue.offer(packet); // requeue the packet so we try again
+        }
     }
 
 }

Reply via email to