TCP testing

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2da15e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2da15e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2da15e

Branch: refs/heads/NIFI-274
Commit: fc2da15e8bd04d6ce239e7499fe65230ae161b45
Parents: 38ffa0a
Author: Tony Kurc <[email protected]>
Authored: Sat Oct 31 00:51:56 2015 -0400
Committer: Tony Kurc <[email protected]>
Committed: Sat Oct 31 00:51:56 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ListenSyslog.java  | 190 ++++++++++---------
 1 file changed, 103 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2da15e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index c585874..066a318 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -24,6 +24,7 @@ import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
@@ -173,7 +174,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         parser = new SyslogParser(Charset.forName(charSet));
         bufferPool = new BufferPool(context.getMaxConcurrentTasks(), 
bufferSize, false, Integer.MAX_VALUE);
-        syslogEvents = new LinkedBlockingQueue<>(40000);
+        syslogEvents = new LinkedBlockingQueue<>(10);
         errorEvents = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
@@ -317,47 +318,23 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
         @Override
         public void run() {
             final ByteBuffer buffer = bufferPool.poll();
-            int count = 0;
-            long timeInPut = 0;
-            long timeInParse =0;
-            long totalTime = 0;
-            long timeInReceive = 0;
-            long now;
-            long then;
             while (!stopped) {
                 try {
-                    if(++count % 1000 == 0){
-                        totalTime = System.currentTimeMillis() - totalTime;
-                        logger.info("time in put {} time in parse {} total 
time {} time in receive {}", new Object[]{timeInPut, timeInParse, totalTime, 
timeInReceive});
-                        timeInPut = 0;
-                        timeInParse = 0;
-                        timeInReceive =0;
-                        totalTime = System.currentTimeMillis();
-                    }
                     int selected = selector.select();
                     if (selected > 0){
                         Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                        while(selectorKeys.hasNext()){
+                        while (selectorKeys.hasNext()){
                             SelectionKey key = selectorKeys.next();
                             selectorKeys.remove();
-                            if(key.isValid()){
-                                DatagramChannel channel = (DatagramChannel) 
key.channel();
-                                then = System.currentTimeMillis();
-                                SocketAddress sender = channel.receive(buffer);
-                                while((sender = channel.receive(buffer)) != 
null) {
-                                    now = System.currentTimeMillis();
-                                    timeInReceive += (now - then);
-                                    then = System.currentTimeMillis();
-
-                                    final SyslogEvent event = 
syslogParser.parseEvent(buffer);
-                                    now = System.currentTimeMillis();
-                                    timeInParse += (now - then);
-                                    logger.trace(event.getFullMessage());
-                                    then = System.currentTimeMillis();
-                                    syslogEvents.put(event); // block until 
space is available
-                                    now = System.currentTimeMillis();
-                                    timeInPut += (now - then);
-                                }
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            DatagramChannel channel = (DatagramChannel) 
key.channel();
+                            SocketAddress sender = channel.receive(buffer);
+                            while (!stopped && (sender = 
channel.receive(buffer)) != null) {
+                                final SyslogEvent event = 
syslogParser.parseEvent(buffer);
+                                logger.trace(event.getFullMessage());
+                                syslogEvents.put(event); // block until space 
is available
                             }
                         }
                     }
@@ -403,6 +380,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
         private ServerSocketChannel serverSocketChannel;
         private ExecutorService executor = Executors.newFixedThreadPool(2);
         private boolean stopped = false;
+        private Selector selector;
+        private BlockingQueue<SelectionKey> keyQueue;
 
         public SocketChannelReader(final BufferPool bufferPool, final 
SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents,
                                    final ProcessorLog logger) {
@@ -410,6 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
+            this.keyQueue = new LinkedBlockingQueue<>(2);
         }
 
         @Override
@@ -424,26 +404,51 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
                 }
             }
             serverSocketChannel.socket().bind(new InetSocketAddress(port));
+            selector = Selector.open();
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
         @Override
         public void run() {
             while (!stopped) {
                 try {
-                    final SocketChannel socketChannel = 
serverSocketChannel.accept();
-                    if (socketChannel == null) {
-                        Thread.sleep(1000L); // wait for an incoming 
connection...
-                    } else {
-                        final SocketChannelHandler handler = new 
SocketChannelHandler(
-                                bufferPool, socketChannel, syslogParser, 
syslogEvents, logger);
-                        logger.debug("Accepted incoming connection");
-                        executor.submit(handler);
+                    int selected = selector.select();
+                    if (selected > 0){
+                        Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
+                        while (selectorKeys.hasNext()){
+                            SelectionKey key = selectorKeys.next();
+                            selectorKeys.remove();
+                            if (!key.isValid()){
+                                continue;
+                            }
+                            if (key.isAcceptable()) {
+                                // TODO: need connection limit
+                                final ServerSocketChannel channel = 
(ServerSocketChannel) key.channel();
+                                final SocketChannel socketChannel = 
channel.accept();
+                                socketChannel.configureBlocking(false);
+                                SelectionKey readKey = 
socketChannel.register(selector, SelectionKey.OP_READ);
+                                ByteBuffer buffer = bufferPool.poll();
+                                buffer.clear();
+                                buffer.mark();
+                                readKey.attach(buffer);
+                            } else if (key.isReadable()) {
+                                key.interestOps(0);
+
+                                final SocketChannelHandler handler = new 
SocketChannelHandler(key, this, 
+                                        syslogParser, syslogEvents, logger);
+                                logger.debug("Accepted incoming connection");
+                                executor.execute(handler);
+                            }
+                        }
+                    }
+                    // Add back all idle
+                    SelectionKey key;
+                    while((key = keyQueue.poll()) != null){
+                        key.interestOps(SelectionKey.OP_READ);
                     }
                 } catch (IOException e) {
                     logger.error("Error accepting connection from 
SocketChannel", e);
-                } catch (InterruptedException e) {
-                    stop();
-                }
+                } 
             }
         }
 
@@ -454,6 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         @Override
         public void stop() {
+            selector.wakeup();
+            
             stopped = true;
         }
 
@@ -474,6 +481,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
         }
 
+        public void completeConnection(SelectionKey key) {
+            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+        }
+
+        public void addBackForSelection(SelectionKey key) {
+            keyQueue.offer(key);
+            selector.wakeup();
+        }
+
     }
 
     /**
@@ -482,17 +498,17 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
      */
     public static class SocketChannelHandler implements Runnable {
 
-        private final BufferPool bufferPool;
-        private final SocketChannel socketChannel;
+        private final SelectionKey key;
+        private final SocketChannelReader dispatcher;
         private final SyslogParser syslogParser;
         private final BlockingQueue<SyslogEvent> syslogEvents;
         private final ProcessorLog logger;
         private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
 
-        public SocketChannelHandler(final BufferPool bufferPool, final 
SocketChannel socketChannel, final SyslogParser syslogParser,
+        public SocketChannelHandler(final SelectionKey key, final 
SocketChannelReader dispatcher, final SyslogParser syslogParser,
                                     final BlockingQueue<SyslogEvent> 
syslogEvents, final ProcessorLog logger) {
-            this.bufferPool = bufferPool;
-            this.socketChannel = socketChannel;
+            this.key = key;
+            this.dispatcher = dispatcher;
             this.syslogParser = syslogParser;
             this.syslogEvents = syslogEvents;
             this.logger = logger;
@@ -500,51 +516,51 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
 
         @Override
         public void run() {
+            boolean eof = false;
+            SocketChannel socketChannel = null;
+            ByteBuffer socketBuffer = null;
             try {
-                int bytesRead = 0;
-                while (bytesRead >= 0 && !Thread.interrupted()) {
-
-                    final ByteBuffer buffer = bufferPool.poll();
-                    if (buffer == null) {
-                        Thread.sleep(10L);
-                        logger.debug("no available buffers, continuing...");
-                        continue;
-                    }
-
-                    try {
-                        // read until the buffer is full
-                        bytesRead = socketChannel.read(buffer);
-                        while (bytesRead > 0) {
-                            bytesRead = socketChannel.read(buffer);
-                        }
-                        buffer.flip();
-
-                        // go through the buffer looking for the end of each 
message
-                        int bufferLength = buffer.limit();
-                        for (int i = 0; i < bufferLength; i++) {
-                            byte currByte = buffer.get(i);
-                            currBytes.write(currByte);
-
-                            // at the end of a message so parse an event, 
reset the buffer, and break out of the loop
-                            if (currByte == '\n') {
-                                final SyslogEvent event = 
syslogParser.parseEvent(currBytes.toByteArray());
-                                logger.trace(event.getFullMessage());
-                                syslogEvents.put(event); // block until space 
is available
-                                currBytes.reset();
-                            }
+                int bytesRead;
+                socketChannel = (SocketChannel) key.channel();
+                socketBuffer = (ByteBuffer) key.attachment();
+                // read until the buffer is full
+                while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+                    socketBuffer.flip();
+                    socketBuffer.mark();
+                    int total = socketBuffer.remaining();
+                    // go through the buffer looking for the end of each 
message
+                    for (int i = 0; i < total; i++) {
+                        byte currByte = socketBuffer.get();
+                        currBytes.write(currByte);
+                        // at the end of a message so parse an event, reset 
the buffer, and break out of the loop
+                        if (currByte == '\n') {
+                            final SyslogEvent event = 
syslogParser.parseEvent(currBytes.toByteArray());
+                            logger.trace(event.getFullMessage());
+                            syslogEvents.put(event); // block until space is 
available
+                            currBytes.reset();
+                            socketBuffer.mark();
                         }
-                    } finally {
-                        bufferPool.returnBuffer(buffer, 0);
                     }
+                    socketBuffer.reset();
+                    socketBuffer.compact();
+                    logger.debug("done handling SocketChannel");
+                }
+                if( bytesRead < 0 ){
+                    eof = true;
                 }
-
-                logger.debug("done handling SocketChannel");
             } catch (ClosedByInterruptException | InterruptedException e) {
                 // nothing to do here
             } catch (IOException e) {
                 logger.error("Error reading from channel", e);
+                eof = true;
             } finally {
-                IOUtils.closeQuietly(socketChannel);
+                if(eof == true){
+                    dispatcher.completeConnection(key);
+                    IOUtils.closeQuietly(socketChannel);
+                }
+                else {
+                    dispatcher.addBackForSelection(key);
+                }
             }
         }
 

Reply via email to