NIFI-274 - added use of Selectors for TCP and UDP connections. Added a max connections to the TCP thread
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5bbdf2a8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5bbdf2a8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5bbdf2a8 Branch: refs/heads/NIFI-274 Commit: 5bbdf2a8abfbe3dce00d8ea1f947f30a415c67f3 Parents: 53bd4a1 Author: Tony Kurc <[email protected]> Authored: Fri Oct 30 08:45:06 2015 -0400 Committer: Tony Kurc <[email protected]> Committed: Tue Nov 3 10:29:14 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 286 ++++++++++++------- .../nifi/processors/standard/PutSyslog.java | 45 +-- .../processors/standard/TestListenSyslog.java | 2 +- 3 files changed, 205 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 9f57c9f..22ae2f6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,30 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.io.nio.BufferPool; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; -import org.apache.nifi.stream.io.ByteArrayOutputStream; - import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -48,6 +24,8 @@ import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; @@ -55,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,6 +43,31 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.file.FileUtils; + @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " + @@ -104,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { .defaultValue("1 MB") .required(true) .build(); - + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max number of TCP connections") + .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -168,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); - + final int maxConnections; + + if (protocol.equals(UDP_VALUE)) { + maxConnections = 1; + } + else{ + maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); + } + parser = new SyslogParser(Charset.forName(charSet)); - bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); + bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); syslogEvents = new LinkedBlockingQueue<>(10); errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); // create either a UDP or TCP reader and call open() to bind to the given port - channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents); + channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections); channelReader.open(port, maxChannelBufferSize); final Thread readerThread = new Thread(channelReader); @@ -185,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // visible for testing to be overridden and provide a mock ChannelReader if desired - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); } else { - return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections); } } @@ -287,6 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final ProcessorLog logger; private DatagramChannel datagramChannel; private volatile boolean stopped = false; + private Selector selector; public DatagramChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) { @@ -308,37 +327,43 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } datagramChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + datagramChannel.register(selector, SelectionKey.OP_READ); } @Override public void run() { + final ByteBuffer buffer = bufferPool.poll(); while (!stopped) { - final ByteBuffer buffer = bufferPool.poll(); try { - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } - - final SocketAddress sender = datagramChannel.receive(buffer); - if (sender == null) { - Thread.sleep(1000L); // nothing to do so wait... - } else { - final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender? - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress sender; + buffer.clear(); + while (!stopped && (sender = channel.receive(buffer)) != null) { + final SyslogEvent event = syslogParser.parseEvent(buffer); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + } + } } } catch (InterruptedException e) { - stop(); + stopped = true; } catch (IOException e) { logger.error("Error reading from DatagramChannel", e); - } finally { - if (buffer != null) { - bufferPool.returnBuffer(buffer, 0); - } } } + if (buffer != null) { + bufferPool.returnBuffer(buffer, 0); + } } @Override @@ -348,11 +373,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void stop() { + selector.wakeup(); stopped = true; } @Override public void close() { + IOUtils.closeQuietly(selector); IOUtils.closeQuietly(datagramChannel); } } @@ -368,15 +395,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; private ServerSocketChannel serverSocketChannel; - private ExecutorService executor = Executors.newFixedThreadPool(2); - private volatile boolean stopped = false; + private ExecutorService executor; + private boolean stopped = false; + private Selector selector; + private BlockingQueue<SelectionKey> keyQueue; + private int maxConnections; + private int currentConnections = 0; public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, - final ProcessorLog logger) { + final ProcessorLog logger, final int maxConnections) { this.bufferPool = bufferPool; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; + this.maxConnections = maxConnections; + this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.executor = Executors.newFixedThreadPool(maxConnections); } @Override @@ -391,26 +425,54 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } serverSocketChannel.socket().bind(new InetSocketAddress(port)); + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while (!stopped) { try { - final SocketChannel socketChannel = serverSocketChannel.accept(); - if (socketChannel == null) { - Thread.sleep(1000L); // wait for an incoming connection... - } else { - final SocketChannelHandler handler = new SocketChannelHandler( - bufferPool, socketChannel, syslogParser, syslogEvents, logger); - logger.debug("Accepted incoming connection"); - executor.submit(handler); + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + if (key.isAcceptable()) { + final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); + final SocketChannel socketChannel = channel.accept(); + if(currentConnections == maxConnections){ + logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() }); + FileUtils.closeQuietly(socketChannel); + } + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + ByteBuffer buffer = bufferPool.poll(); + buffer.clear(); + buffer.mark(); + readKey.attach(buffer); + } else if (key.isReadable()) { + key.interestOps(0); + + final SocketChannelHandler handler = new SocketChannelHandler(key, this, + syslogParser, syslogEvents, logger); + logger.debug("Accepted incoming connection"); + executor.execute(handler); + } + } + } + // Add back all idle + SelectionKey key; + while((key = keyQueue.poll()) != null){ + key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { logger.error("Error accepting connection from SocketChannel", e); - } catch (InterruptedException e) { - stop(); - } + } } } @@ -421,6 +483,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void stop() { + selector.wakeup(); + stopped = true; } @@ -441,6 +505,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + public void completeConnection(SelectionKey key) { + bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + } + + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); + } + } /** @@ -449,17 +522,17 @@ public class ListenSyslog extends AbstractSyslogProcessor { */ public static class SocketChannelHandler implements Runnable { - private final BufferPool bufferPool; - private final SocketChannel socketChannel; + private final SelectionKey key; + private final SocketChannelReader dispatcher; private final SyslogParser syslogParser; private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - public SocketChannelHandler(final BufferPool bufferPool, final SocketChannel socketChannel, final SyslogParser syslogParser, + public SocketChannelHandler(final SelectionKey key, final SocketChannelReader dispatcher, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger) { - this.bufferPool = bufferPool; - this.socketChannel = socketChannel; + this.key = key; + this.dispatcher = dispatcher; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; @@ -467,52 +540,53 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void run() { + boolean eof = false; + SocketChannel socketChannel = null; + ByteBuffer socketBuffer = null; try { - int bytesRead = 0; - while (bytesRead >= 0 && !Thread.interrupted()) { - - final ByteBuffer buffer = bufferPool.poll(); - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } - - try { - // read until the buffer is full - bytesRead = socketChannel.read(buffer); - while (bytesRead > 0) { - bytesRead = socketChannel.read(buffer); - } - buffer.flip(); - - // go through the buffer looking for the end of each message - int bufferLength = buffer.limit(); - for (int i = 0; i < bufferLength; i++) { - byte currByte = buffer.get(i); - currBytes.write(currByte); - - // at the end of a message so parse an event, reset the buffer, and break out of the loop - if (currByte == '\n') { - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), - socketChannel.socket().getInetAddress().toString()); - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available - currBytes.reset(); - } + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + socketBuffer = (ByteBuffer) key.attachment(); + // read until the buffer is full + while((bytesRead = socketChannel.read(socketBuffer)) > 0){ + socketBuffer.flip(); + socketBuffer.mark(); + int total = socketBuffer.remaining(); + // go through the buffer looking for the end of each message + for (int i = 0; i < total; i++) { + byte currByte = socketBuffer.get(); + currBytes.write(currByte); + // at the end of a message so parse an event, reset the buffer, and break out of the loop + if (currByte == '\n') { + final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), + socketChannel.socket().getInetAddress().toString()); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + currBytes.reset(); + socketBuffer.mark(); } - } finally { - bufferPool.returnBuffer(buffer, 0); } + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("done handling SocketChannel"); + } + if( bytesRead < 0 ){ + eof = true; } - - logger.debug("done handling SocketChannel"); } catch (ClosedByInterruptException | InterruptedException e) { - // nothing to do here + logger.debug("read loop interrupted, closing connection"); + eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); + eof = true; } finally { - IOUtils.closeQuietly(socketChannel); + if(eof == true){ + dispatcher.completeConnection(key); + IOUtils.closeQuietly(socketChannel); + } + else { + dispatcher.addBackForSelection(key); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 502b26f..5e558ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -225,6 +225,29 @@ public class PutSyslog extends AbstractSyslogProcessor { } } + private void pruneIdleSenders(final long idleThreshold){ + long currentTime = System.currentTimeMillis(); + final List<ChannelSender> putBack = new ArrayList<>(); + + // if a connection hasn't been used with in the threshold then it gets closed + ChannelSender sender; + while ((sender = senderPool.poll()) != null) { + if (currentTime > (sender.lastUsed + idleThreshold)) { + getLogger().debug("Closing idle connection..."); + sender.close(); + } else { + putBack.add(sender); + } + } + // re-queue senders that weren't idle, but if the queue is full then close the sender + for (ChannelSender putBackSender : putBack) { + boolean returned = senderPool.offer(putBackSender); + if (!returned) { + putBackSender.close(); + } + } + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final String protocol = context.getProperty(PROTOCOL).getValue(); @@ -232,27 +255,7 @@ public class PutSyslog extends AbstractSyslogProcessor { final List<FlowFile> flowFiles = session.get(batchSize); if (flowFiles == null || flowFiles.isEmpty()) { - final List<ChannelSender> putBack = new ArrayList<>(); - final long expirationThreshold = context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); - - // if a connection hasn't been used with in the threshold then it gets closed - ChannelSender sender; - while ((sender = senderPool.poll()) != null) { - if (System.currentTimeMillis() > (sender.lastUsed + expirationThreshold)) { - getLogger().debug("Closing idle connection..."); - sender.close(); - } else { - putBack.add(sender); - } - } - - // re-queue senders that weren't idle, but if the queue is full then close the sender - for (ChannelSender putBackSender : putBack) { - boolean returned = senderPool.offer(putBackSender); - if (!returned) { - putBackSender.close(); - } - } + pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); return; } http://git-wip-us.apache.org/repos/asf/nifi/blob/5bbdf2a8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 0e0d972..7796868 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -391,7 +391,7 @@ public class TestListenSyslog { } @Override - protected ChannelReader createChannelReader(String protocol, BufferPool bufferPool, SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) throws IOException { + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) { return new ChannelReader() { @Override public void open(int port, int maxBufferSize) throws IOException {
