Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 6787afc30 -> ca7e3c0da


NIFI-548 - Listen UDP should support generation of a flowfile per datagram


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/badf1018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/badf1018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/badf1018

Branch: refs/heads/develop
Commit: badf1018c1765e4d1b277f3ede5a3231d3fe1734
Parents: c4d1666
Author: bbende <[email protected]>
Authored: Fri May 8 23:18:05 2015 -0400
Committer: bbende <[email protected]>
Committed: Wed May 13 18:09:16 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/io/nio/ChannelDispatcher.java     |  6 ++++--
 .../java/org/apache/nifi/io/nio/ChannelListener.java  |  4 ++--
 .../org/apache/nifi/io/nio/DatagramChannelReader.java |  8 ++++++--
 .../org/apache/nifi/io/nio/example/ServerMain.java    |  2 +-
 .../apache/nifi/processors/standard/ListenUDP.java    | 14 ++++++++++++--
 .../processors/standard/util/UDPStreamConsumer.java   | 11 +++++++----
 6 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
index 824f2df..a4308e3 100644
--- 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
+++ 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -45,17 +45,19 @@ public final class ChannelDispatcher implements Runnable {
     private final StreamConsumerFactory factory;
     private final AtomicLong channelReaderFrequencyMilliseconds = new 
AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
     private final long timeout;
+    private final boolean readSingleDatagram;
     private volatile boolean stop = false;
     public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
 
     public ChannelDispatcher(final Selector serverSocketSelector, final 
Selector socketChannelSelector, final ScheduledExecutorService service,
-            final StreamConsumerFactory factory, final BufferPool buffers, 
final long timeout, final TimeUnit unit) {
+            final StreamConsumerFactory factory, final BufferPool buffers, 
final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
         this.serverSocketSelector = serverSocketSelector;
         this.socketChannelSelector = socketChannelSelector;
         this.executor = service;
         this.factory = factory;
         emptyBuffers = buffers;
         this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+        this.readSingleDatagram = readSingleDatagram;
     }
 
     public void setChannelReaderFrequency(final long period, final TimeUnit 
timeUnit) {
@@ -136,7 +138,7 @@ public final class ChannelDispatcher implements Runnable {
             // for a DatagramChannel we don't want to create a new reader 
unless it is a new DatagramChannel. The only
             // way to tell if it's new is the lack of an attachment.
             if (channel instanceof DatagramChannel && 
socketChannelKey.attachment() == null) {
-                reader = new 
DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, 
emptyBuffers, factory);
+                reader = new 
DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, 
emptyBuffers, factory, readSingleDatagram);
                 socketChannelKey.attach(reader);
                 final ScheduledFuture<?> readerFuture = 
executor.scheduleWithFixedDelay(reader, 10L, 
channelReaderFrequencyMilliseconds.get(),
                         TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
index 7cbf589..ab77063 100644
--- 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
+++ 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -75,14 +75,14 @@ public final class ChannelListener {
     private volatile long channelReaderFrequencyMSecs = 50;
 
     public ChannelListener(final int threadPoolSize, final 
StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
-            TimeUnit unit) throws IOException {
+            TimeUnit unit, final boolean readSingleDatagram) throws 
IOException {
         this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); 
// need to allow for long running ChannelDispatcher thread
         this.serverSocketSelector = Selector.open();
         this.socketChannelSelector = Selector.open();
         this.bufferPool = bufferPool;
         this.initialBufferPoolSize = bufferPool.size();
         channelDispatcher = new ChannelDispatcher(serverSocketSelector, 
socketChannelSelector, executor, consumerFactory, bufferPool,
-                timeout, unit);
+                timeout, unit, readSingleDatagram);
         executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
index db76279..a4670b9 100644
--- 
a/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
+++ 
b/nifi/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
@@ -27,8 +27,12 @@ public final class DatagramChannelReader extends 
AbstractChannelReader {
 
     public static final int MAX_UDP_PACKET_SIZE = 65507;
 
-    public DatagramChannelReader(final String id, final SelectionKey key, 
final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+    private final boolean readSingleDatagram;
+
+    public DatagramChannelReader(final String id, final SelectionKey key, 
final BufferPool empties, final StreamConsumerFactory consumerFactory,
+            final boolean readSingleDatagram) {
         super(id, key, empties, consumerFactory);
+        this.readSingleDatagram = readSingleDatagram;
     }
 
     /**
@@ -45,7 +49,7 @@ public final class DatagramChannelReader extends 
AbstractChannelReader {
         final DatagramChannel dChannel = (DatagramChannel) key.channel();
         final int initialBufferPosition = buffer.position();
         while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && 
key.isReadable()) {
-            if (dChannel.receive(buffer) == null) {
+            if (dChannel.receive(buffer) == null || readSingleDatagram) {
                 break;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
 
b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
index 27d5ccc..a266ade 100644
--- 
a/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
+++ 
b/nifi/nifi-commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
@@ -52,7 +52,7 @@ public final class ServerMain {
         ChannelListener listener = null;
         try {
             executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, 
TimeUnit.SECONDS);
-            listener = new ChannelListener(5, new 
ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, 
TimeUnit.MILLISECONDS);
+            listener = new ChannelListener(5, new 
ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, 
TimeUnit.MILLISECONDS, false);
             listener.setChannelReaderSchedulingPeriod(50L, 
TimeUnit.MILLISECONDS);
             listener.addDatagramChannel(null, 20000, 32 << 20);
             LOGGER.info("Listening for UDP data on port 20000");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index fa60d6b..6a88bd4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -147,6 +147,14 @@ public class ListenUDP extends 
AbstractSessionFactoryProcessor {
             .required(true)
             .build();
 
+    public static final PropertyDescriptor FLOW_FILE_PER_DATAGRAM = new 
PropertyDescriptor.Builder()
+            .name("FlowFile Per Datagram")
+            .description("Determines if this processor emits each datagram as 
a FlowFile, or if multiple datagrams can be placed in a single FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
             .name("Max Buffer Size")
             .description("Determines the size each receive buffer may be")
@@ -273,6 +281,7 @@ public class ListenUDP extends 
AbstractSessionFactoryProcessor {
         props.add(RECV_BUFFER_COUNT);
         props.add(FLOW_FILES_PER_SESSION);
         props.add(RECV_TIMEOUT);
+        props.add(FLOW_FILE_PER_DATAGRAM);
         properties = Collections.unmodifiableList(props);
     }
     // defaults
@@ -429,18 +438,19 @@ public class ListenUDP extends 
AbstractSessionFactoryProcessor {
                 final String nicIPAddressStr = 
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
                 final Double flowFileSizeTrigger = 
context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B);
                 final int recvTimeoutMS = 
context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+                final boolean flowFilePerDatagram = 
context.getProperty(FLOW_FILE_PER_DATAGRAM).asBoolean();
                 final StreamConsumerFactory consumerFactory = new 
StreamConsumerFactory() {
 
                     @Override
                     public StreamConsumer newInstance(final String streamId) {
-                        final UDPStreamConsumer consumer = new 
UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), 
getLogger());
+                        final UDPStreamConsumer consumer = new 
UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), 
getLogger(), flowFilePerDatagram);
                         consumerRef.set(consumer);
                         return consumer;
                     }
                 };
                 final int readerMilliseconds = 
context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
                 final BufferPool bufferPool = new BufferPool(bufferCount, 
bufferSize.intValue(), false, Integer.MAX_VALUE);
-                channelListener = new 
ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, 
recvTimeoutMS, TimeUnit.MILLISECONDS);
+                channelListener = new 
ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, 
recvTimeoutMS, TimeUnit.MILLISECONDS, flowFilePerDatagram);
                 // specifying a sufficiently low number for each stream to be 
fast enough though very efficient
                 
channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, 
TimeUnit.MILLISECONDS);
                 InetAddress nicIPAddress = null;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/badf1018/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
index 6170509..38f8add 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/UDPStreamConsumer.java
@@ -54,11 +54,12 @@ public class UDPStreamConsumer implements StreamConsumer {
     private ProcessSession session;
     private final UDPConsumerCallback udpCallback;
 
-    public UDPStreamConsumer(final String streamId, final List<FlowFile> 
newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger) {
+    public UDPStreamConsumer(final String streamId, final List<FlowFile> 
newFlowFiles, final long fileSizeTrigger, final ProcessorLog logger,
+            final boolean flowFilePerDatagram) {
         this.uniqueId = streamId;
         this.newFlowFileQueue = newFlowFiles;
         this.logger = logger;
-        this.udpCallback = new UDPConsumerCallback(filledBuffers, 
fileSizeTrigger);
+        this.udpCallback = new UDPConsumerCallback(filledBuffers, 
fileSizeTrigger, flowFilePerDatagram);
     }
 
     @Override
@@ -173,10 +174,12 @@ public class UDPStreamConsumer implements StreamConsumer {
         BufferPool bufferPool;
         final BlockingQueue<ByteBuffer> filledBuffers;
         final long fileSizeTrigger;
+        final boolean flowFilePerDatagram;
 
-        public UDPConsumerCallback(final BlockingQueue<ByteBuffer> 
filledBuffers, final long fileSizeTrigger) {
+        public UDPConsumerCallback(final BlockingQueue<ByteBuffer> 
filledBuffers, final long fileSizeTrigger, final boolean flowFilePerDatagram) {
             this.filledBuffers = filledBuffers;
             this.fileSizeTrigger = fileSizeTrigger;
+            this.flowFilePerDatagram = flowFilePerDatagram;
         }
 
         public void setBufferPool(BufferPool pool) {
@@ -196,7 +199,7 @@ public class UDPStreamConsumer implements StreamConsumer {
                                 bytesWrittenThisPass += wbc.write(buffer);
                             }
                             totalBytes += bytesWrittenThisPass;
-                            if (totalBytes > fileSizeTrigger) {
+                            if (totalBytes > fileSizeTrigger || 
flowFilePerDatagram) {
                                 break;// this is enough data
                             }
                         } finally {

Reply via email to