started work on max connections
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f58b2af Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f58b2af Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f58b2af Branch: refs/heads/NIFI-274 Commit: 7f58b2af333547124c497e373c666715623c92a3 Parents: 2c2c6a2 Author: Tony Kurc <[email protected]> Authored: Sat Oct 31 13:17:34 2015 -0400 Committer: Tony Kurc <[email protected]> Committed: Sat Oct 31 13:17:34 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 45 +++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7f58b2af/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index fd93847..457ec5d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -67,6 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.file.FileUtils; @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + @@ -107,7 +108,13 @@ public class ListenSyslog extends AbstractSyslogProcessor { .defaultValue("1 MB") .required(true) .build(); - + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max number of TCP connections") + .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -171,14 +178,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); - + final int maxConnections; + + if (protocol.equals(UDP_VALUE)) { + maxConnections = 1; + } + else{ + maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); + } + parser = new SyslogParser(Charset.forName(charSet)); - bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); + bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); syslogEvents = new LinkedBlockingQueue<>(10); errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); // create either a UDP or TCP reader and call open() to bind to the given port - channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents); + channelReader = createChannelReader(protocol, bufferPool, parser, syslogEvents, maxConnections); channelReader.open(port, maxChannelBufferSize); final Thread readerThread = new Thread(channelReader); @@ -188,12 +203,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // visible for testing to be overridden and provide a mock ChannelReader if desired - protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents) + protected ChannelReader createChannelReader(final String protocol, final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, int maxConnections) throws IOException { if (protocol.equals(UDP_VALUE.getValue())) { return new DatagramChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); } else { - return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger()); + return new SocketChannelReader(bufferPool, syslogParser, syslogEvents, getLogger(), maxConnections); } } @@ -379,18 +394,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; private ServerSocketChannel serverSocketChannel; - private ExecutorService executor = Executors.newFixedThreadPool(2); + private ExecutorService executor; private boolean stopped = false; private Selector selector; private BlockingQueue<SelectionKey> keyQueue; + private int maxConnections; + private int currentConnections = 0; public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, - final ProcessorLog logger) { + final ProcessorLog logger, final int maxConnections) { this.bufferPool = bufferPool; this.syslogParser = syslogParser; this.syslogEvents = syslogEvents; this.logger = logger; - this.keyQueue = new LinkedBlockingQueue<>(2); + this.maxConnections = maxConnections; + this.keyQueue = new LinkedBlockingQueue<>(maxConnections); + this.executor = Executors.newFixedThreadPool(maxConnections); } @Override @@ -423,9 +442,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { continue; } if (key.isAcceptable()) { - // TODO: need connection limit final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = channel.accept(); + if(currentConnections == maxConnections){ + logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() }); + FileUtils.closeQuietly(socketChannel); + } socketChannel.configureBlocking(false); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); ByteBuffer buffer = bufferPool.poll(); @@ -550,7 +572,8 @@ public class ListenSyslog extends AbstractSyslogProcessor { eof = true; } } catch (ClosedByInterruptException | InterruptedException e) { - // nothing to do here + logger.debug("read loop interrupted, closing connection"); + eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); eof = true;
