Repository: nifi Updated Branches: refs/heads/NIFI-964 4abdf6a89 -> 00375ced2
NIFI-964 Initial implementation of GetPcap processor Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/00375ced Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/00375ced Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/00375ced Branch: refs/heads/NIFI-964 Commit: 00375ced22a75c73a1a9f601295927346a087a43 Parents: 4abdf6a Author: Bryan Bende <[email protected]> Authored: Wed Sep 16 15:12:32 2015 -0400 Committer: Bryan Bende <[email protected]> Committed: Wed Sep 16 15:12:32 2015 -0400 ---------------------------------------------------------------------- .../nifi-pcap-processors/pom.xml | 10 + .../apache/nifi/processors/pcap/GetPcap.java | 210 ++++++++++++++++--- 2 files changed, 188 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/00375ced/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml index 6ac6f2e..0907b98 100644 --- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/pom.xml @@ -27,6 +27,16 @@ <dependencies> <dependency> + <groupId>org.pcap4j</groupId> + <artifactId>pcap4j-core</artifactId> + <version>1.6.0</version> + </dependency> + <dependency> + <groupId>org.pcap4j</groupId> + <artifactId>pcap4j-packetfactory-static</artifactId> + <version>1.6.0</version> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/00375ced/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java index f63779a..7a73dbb 100644 --- a/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java +++ b/nifi-nar-bundles/nifi-pcap-bundle/nifi-pcap-processors/src/main/java/org/apache/nifi/processors/pcap/GetPcap.java @@ -16,54 +16,120 @@ */ package org.apache.nifi.processors.pcap; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.*; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; - -import java.util.*; - -@Tags({"example"}) -@CapabilityDescription("Provide a description") -@SeeAlso({}) -@ReadsAttributes({@ReadsAttribute(attribute="", description="")}) -@WritesAttributes({@WritesAttribute(attribute="", description="")}) +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.pcap4j.core.BpfProgram; +import org.pcap4j.core.NotOpenException; +import org.pcap4j.core.PacketListener; +import org.pcap4j.core.PcapHandle; +import org.pcap4j.core.PcapNativeException; +import org.pcap4j.core.PcapNetworkInterface; +import org.pcap4j.core.Pcaps; +import org.pcap4j.packet.Packet; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"pcap", "packet", "network"}) +@CapabilityDescription("Uses the pcap4j library to capture packets for a given interface. Each packet is emitted as " + + "a single FlowFile with the payload of the packet as the FlowFile content.") +@WritesAttributes({@WritesAttribute(attribute="packet.header", description="The header of the given packet")}) public class GetPcap extends AbstractProcessor { - public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor - .Builder().name("My Property") - .description("Example Property") + public static final PropertyDescriptor INTERFACE_NAME = new PropertyDescriptor + .Builder().name("Interface Name") + .description("The name of the interface to capture from (ex: en0)") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final Relationship MY_RELATIONSHIP = new Relationship.Builder() - .name("my_relationship") - .description("Example relationship") + public static final PropertyDescriptor BPF_EXPRESSION = new PropertyDescriptor + .Builder().name("BPF Expression") + .description("A Berkeley Packet Filter expression used to filter packets (ex: icmp)") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - private List<PropertyDescriptor> descriptors; + public static final PropertyDescriptor SNAPSHOT_LENGTH = new PropertyDescriptor + .Builder().name("Snapshot Length") + .description("The amount of data in bytes to capture for each packet.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("65536") + .build(); + + public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor + .Builder().name("Read Timeout") + .description("The read timeout in milliseconds. Must be non-negative. May be ignored by some OSs. " + + "0 means disable buffering on Solaris. 0 means infinite on the other OSs. 1 through 9 means infinite on Solaris.") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + public static final AllowableValue PROMISCUOUS_MODE_VALUE = new AllowableValue("PROMISCUOUS", "PROMISCUOUS"); + public static final AllowableValue NON_PROMISCUOUS_MODE_VALUE = new AllowableValue("NONPROMISCUOUS", "NONPROMISCUOUS"); + + public static final PropertyDescriptor MODE = new PropertyDescriptor + .Builder().name("Mode") + .description("The capture mode.") + .required(true) + .allowableValues(PROMISCUOUS_MODE_VALUE, NON_PROMISCUOUS_MODE_VALUE) + .defaultValue(PROMISCUOUS_MODE_VALUE.getValue()) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Successful captures are route out this relationship") + .build(); + + public static final String PACKET_HEADER_ATTR = "packet.header"; + + private List<PropertyDescriptor> descriptors; private Set<Relationship> relationships; + private AtomicReference<PcapNetworkInterface> pcapNetworkInterface = new AtomicReference<>(null); + private AtomicReference<PcapHandle> pcapHandle = new AtomicReference<>(null); + private final BlockingQueue<Packet> packetQueue = new LinkedBlockingQueue<>(100); + @Override protected void init(final ProcessorInitializationContext context) { - final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); - descriptors.add(MY_PROPERTY); + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(INTERFACE_NAME); + descriptors.add(BPF_EXPRESSION); + descriptors.add(SNAPSHOT_LENGTH); + descriptors.add(READ_TIMEOUT); + descriptors.add(MODE); this.descriptors = Collections.unmodifiableList(descriptors); - final Set<Relationship> relationships = new HashSet<Relationship>(); - relationships.add(MY_RELATIONSHIP); + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(SUCCESS); this.relationships = Collections.unmodifiableSet(relationships); } @@ -78,19 +144,99 @@ public class GetPcap extends AbstractProcessor { } @OnScheduled - public void onScheduled(final ProcessContext context) { + public void onScheduled(final ProcessContext context) throws PcapNativeException, NotOpenException { + final String interfaceName = context.getProperty(INTERFACE_NAME).getValue(); + final String filter = context.getProperty(BPF_EXPRESSION).getValue(); + final String mode = context.getProperty(MODE).getValue(); + final int snapLen = context.getProperty(SNAPSHOT_LENGTH).asInteger(); + final int readTimeout = context.getProperty(READ_TIMEOUT).asInteger(); + + final PcapNetworkInterface nif = Pcaps.getDevByName(interfaceName); + if (nif == null) { + throw new IllegalStateException("Unable to locate Network Interface " + interfaceName); + } + + final PcapHandle handle = nif.openLive(snapLen, PcapNetworkInterface.PromiscuousMode.valueOf(mode), readTimeout); + if (filter != null && filter.length() > 0) { + handle.setFilter(filter, BpfProgram.BpfCompileMode.OPTIMIZE); + } + + final Thread listenerThread = new Thread(new Runnable() { + @Override + public void run() { + try { + handle.loop(-1, new PacketListener() { + @Override + public void gotPacket(Packet packet) { + packetQueue.offer(packet); + } + }); + getLogger().info("PcapListener shutting down..."); + } catch (PcapNativeException e) { + getLogger().error("Error calling pcap native libraries", e); + } catch (NotOpenException e) { + getLogger().error("Pcap handle was not open", e); + } catch (InterruptedException e) { + getLogger().info("PcapListener interrupted"); + } + } + }); + listenerThread.setDaemon(true); + listenerThread.setName("PcapListener"); + listenerThread.start(); + + pcapNetworkInterface.set(nif); + pcapHandle.set(handle); + } + @OnUnscheduled + public void onUnscheduled() { + final PcapHandle handle = pcapHandle.get(); + if (handle != null) { + try { + getLogger().debug("Breaking pcap loop..."); + handle.breakLoop(); + } catch (NotOpenException e) { + getLogger().warn("Pcap handle was not open: {}", new Object[]{e.getMessage()}); + } finally { + getLogger().debug("Closing pcap Handle..."); + handle.close(); + } + } } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - // TODO implement + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + packetQueue.clear(); // clear any queued packets as they may no longer be valid after properties have been changed + } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Packet packet = packetQueue.poll(); + if (packet == null) { + return; + } + + try { + FlowFile flowFile = session.create(); + flowFile = session.putAttribute(flowFile, PACKET_HEADER_ATTR, packet.getHeader().toString()); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream rawOut) throws IOException { + try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + out.write(packet.getRawData()); + out.flush(); + } + } + }); + + getLogger().info("Transferring FlowFile {} to success", new Object[]{flowFile}); + session.transfer(flowFile, SUCCESS); + } catch (ProcessException pe) { + getLogger().error("Error processing packet", pe); + packetQueue.offer(packet); // requeue the packet so we try again + } } }
