TCP testing
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2da15e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2da15e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2da15e Branch: refs/heads/NIFI-274 Commit: fc2da15e8bd04d6ce239e7499fe65230ae161b45 Parents: 38ffa0a Author: Tony Kurc <[email protected]> Authored: Sat Oct 31 00:51:56 2015 -0400 Committer: Tony Kurc <[email protected]> Committed: Sat Oct 31 00:51:56 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 190 ++++++++++--------- 1 file changed, 103 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2da15e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index c585874..066a318 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -24,6 +24,7 @@ import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -173,7 +174,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { parser = new SyslogParser(Charset.forName(charSet)); bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); - syslogEvents = new LinkedBlockingQueue<>(40000); + syslogEvents = new LinkedBlockingQueue<>(10); errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); // create either a UDP or TCP reader and call open() to bind to the given port @@ -317,47 +318,23 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void run() { final ByteBuffer buffer = bufferPool.poll(); - int count = 0; - long timeInPut = 0; - long timeInParse =0; - long totalTime = 0; - long timeInReceive = 0; - long now; - long then; while (!stopped) { try { - if(++count % 1000 == 0){ - totalTime = System.currentTimeMillis() - totalTime; - logger.info("time in put {} time in parse {} total time {} time in receive {}", new Object[]{timeInPut, timeInParse, totalTime, timeInReceive}); - timeInPut = 0; - timeInParse = 0; - timeInReceive =0; - totalTime = System.currentTimeMillis(); - } int selected = selector.select(); if (selected > 0){ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while(selectorKeys.hasNext()){ + while (selectorKeys.hasNext()){ SelectionKey key = selectorKeys.next(); selectorKeys.remove(); - if(key.isValid()){ - DatagramChannel channel = (DatagramChannel) key.channel(); - then = System.currentTimeMillis(); - SocketAddress sender = channel.receive(buffer); - while((sender = channel.receive(buffer)) != null) { - now = System.currentTimeMillis(); - timeInReceive += (now - then); - then = System.currentTimeMillis(); - - final SyslogEvent event = syslogParser.parseEvent(buffer); - now = System.currentTimeMillis(); - timeInParse += (now - then); - logger.trace(event.getFullMessage()); - then = System.currentTimeMillis(); - syslogEvents.put(event); // block until space is available - now = System.currentTimeMillis(); - timeInPut += (now - then); - } + if (!key.isValid()){ + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress sender = channel.receive(buffer); + while (!stopped && (sender = channel.receive(buffer)) != null) { + final SyslogEvent event = syslogParser.parseEvent(buffer); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available } } } @@ -403,6 +380,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { private ServerSocketChannel serverSocketChannel; private ExecutorService executor = Executors.newFixedThreadPool(2); private boolean stopped = false; + private Selector selector; + private BlockingQueue<SelectionKey> keyQueue; public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) { @@ -410,6 +389,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; + this.keyQueue = new LinkedBlockingQueue<>(2); } @Override @@ -424,26 +404,51 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } serverSocketChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while (!stopped) { try { - final SocketChannel socketChannel = serverSocketChannel.accept(); - if (socketChannel == null) { - Thread.sleep(1000L); // wait for an incoming connection... - } else { - final SocketChannelHandler handler = new SocketChannelHandler( - bufferPool, socketChannel, syslogParser, syslogEvents, logger); - logger.debug("Accepted incoming connection"); - executor.submit(handler); + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + if (key.isAcceptable()) { + // TODO: need connection limit + final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); + final SocketChannel socketChannel = channel.accept(); + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + ByteBuffer buffer = bufferPool.poll(); + buffer.clear(); + buffer.mark(); + readKey.attach(buffer); + } else if (key.isReadable()) { + key.interestOps(0); + + final SocketChannelHandler handler = new SocketChannelHandler(key, this, + syslogParser, syslogEvents, logger); + logger.debug("Accepted incoming connection"); + executor.execute(handler); + } + } + } + // Add back all idle + SelectionKey key; + while((key = keyQueue.poll()) != null){ + key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { logger.error("Error accepting connection from SocketChannel", e); - } catch (InterruptedException e) { - stop(); - } + } } } @@ -454,6 +459,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void stop() { + selector.wakeup(); + stopped = true; } @@ -474,6 +481,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + public void completeConnection(SelectionKey key) { + bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + } + + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); + } + } /** @@ -482,17 +498,17 @@ public class ListenSyslog extends AbstractSyslogProcessor { */ public static class SocketChannelHandler implements Runnable { - private final BufferPool bufferPool; - private final SocketChannel socketChannel; + private final SelectionKey key; + private final SocketChannelReader dispatcher; private final SyslogParser syslogParser; private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser, + public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) { - this.bufferPool = bufferPool; - this.socketChannel = socketChannel; + this.key = key; + this.dispatcher = dispatcher; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; @@ -500,51 +516,51 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void run() { + boolean eof = false; + SocketChannel socketChannel = null; + ByteBuffer socketBuffer = null; try { - int bytesRead = 0; - while (bytesRead >= 0 && !Thread.interrupted()) { - - final ByteBuffer buffer = bufferPool.poll(); - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } - - try { - // read until the buffer is full - bytesRead = socketChannel.read(buffer); - while (bytesRead > 0) { - bytesRead = socketChannel.read(buffer); - } - buffer.flip(); - - // go through the buffer looking for the end of each message - int bufferLength = buffer.limit(); - for (int i = 0; i < bufferLength; i++) { - byte currByte = buffer.get(i); - currBytes.write(currByte); - - // at the end of a message so parse an event, reset the buffer, and break out of the loop - if (currByte == '\n') { - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray()); - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available - currBytes.reset(); - } + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + socketBuffer = (ByteBuffer) key.attachment(); + // read until the buffer is full + while((bytesRead = socketChannel.read(socketBuffer)) > 0){ + socketBuffer.flip(); + socketBuffer.mark(); + int total = socketBuffer.remaining(); + // go through the buffer looking for the end of each message + for (int i = 0; i < total; i++) { + byte currByte = socketBuffer.get(); + currBytes.write(currByte); + // at the end of a message so parse an event, reset the buffer, and break out of the loop + if (currByte == '\n') { + final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray()); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + currBytes.reset(); + socketBuffer.mark(); } - } finally { - bufferPool.returnBuffer(buffer, 0); } + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("done handling SocketChannel"); + } + if( bytesRead < 0 ){ + eof = true; } - - logger.debug("done handling SocketChannel"); } catch (ClosedByInterruptException | InterruptedException e) { // nothing to do here } catch (IOException e) { logger.error("Error reading from channel", e); + eof = true; } finally { - IOUtils.closeQuietly(socketChannel); + if(eof == true){ + dispatcher.completeConnection(key); + IOUtils.closeQuietly(socketChannel); + } + else { + dispatcher.addBackForSelection(key); + } } }
