Repository: nifi Updated Branches: refs/heads/NIFI-274 5bbdf2a8a -> e486f4619
NIFI-274 Added comments and code review changes Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e486f461 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e486f461 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e486f461 Branch: refs/heads/NIFI-274 Commit: e486f4619757b261c9917e4bfb4f13e05fa2b699 Parents: 5bbdf2a Author: Tony Kurc <[email protected]> Authored: Tue Nov 3 19:35:09 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Tue Nov 3 19:35:09 2015 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/ListenSyslog.java | 110 +++++++++++++------ 1 file changed, 75 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e486f461/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 22ae2f6..eafe694 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -66,7 +67,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.SyslogEvent; import org.apache.nifi.processors.standard.util.SyslogParser; import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.util.file.FileUtils; + @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + @@ -109,12 +110,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { .required(true) .build(); public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max number of TCP connections") - .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") - .addValidator(StandardValidators.createLongValidator(1, 65535, true)) - .defaultValue("2") - .required(true) - .build(); + .name("Max number of TCP connections") + .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -142,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(CHARSET); + descriptors.add(MAX_CONNECTIONS); this.descriptors = Collections.unmodifiableList(descriptors); final Set<Relationship> relationships = new HashSet<>(); @@ -182,8 +184,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { if (protocol.equals(UDP_VALUE)) { maxConnections = 1; - } - else{ + } else{ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); } @@ -339,17 +340,22 @@ public class ListenSyslog extends AbstractSyslogProcessor { int selected = selector.select(); if (selected > 0){ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()){ + while (selectorKeys.hasNext()) { SelectionKey key = selectorKeys.next(); selectorKeys.remove(); - if (!key.isValid()){ + if (!key.isValid()) { continue; } DatagramChannel channel = (DatagramChannel) key.channel(); SocketAddress sender; buffer.clear(); while (!stopped && (sender = channel.receive(buffer)) != null) { - final SyslogEvent event = syslogParser.parseEvent(buffer); + final SyslogEvent event; + if (sender instanceof InetSocketAddress) { + event = syslogParser.parseEvent(buffer, ((InetSocketAddress)sender).getAddress().toString()); + } else { + event = syslogParser.parseEvent(buffer); + } logger.trace(event.getFullMessage()); syslogEvents.put(event); // block until space is available } @@ -394,13 +400,12 @@ public class ListenSyslog extends AbstractSyslogProcessor { private final SyslogParser syslogParser; private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; - private ServerSocketChannel serverSocketChannel; - private ExecutorService executor; - private boolean stopped = false; + private final ExecutorService executor; + private volatile boolean stopped = false; private Selector selector; - private BlockingQueue<SelectionKey> keyQueue; - private int maxConnections; - private int currentConnections = 0; + private final BlockingQueue<SelectionKey> keyQueue; + private final int maxConnections; + private final AtomicInteger currentConnections = new AtomicInteger(0); public SocketChannelReader(final BufferPool bufferPool, final SyslogParser syslogParser, final BlockingQueue<SyslogEvent> syslogEvents, final ProcessorLog logger, final int maxConnections) { @@ -415,7 +420,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public void open(final int port, int maxBufferSize) throws IOException { - serverSocketChannel = ServerSocketChannel.open(); + final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); if (maxBufferSize > 0) { serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); @@ -443,29 +448,38 @@ public class ListenSyslog extends AbstractSyslogProcessor { continue; } if (key.isAcceptable()) { + // Handle new connections coming in final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = channel.accept(); - if(currentConnections == maxConnections){ + // Check for available connections + if (currentConnections.incrementAndGet() > maxConnections){ + currentConnections.decrementAndGet(); logger.info("Rejecting connection from {} because max connections has been met", new Object[]{ socketChannel.getRemoteAddress().toString() }); - FileUtils.closeQuietly(socketChannel); + IOUtils.closeQuietly(socketChannel); + continue; } + logger.debug("Accepted incoming connection from {}", + new Object[]{socketChannel.getRemoteAddress().toString()} ); + // Set socket to non-blocking, and register with selector socketChannel.configureBlocking(false); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + // Prepare the byte buffer for the reads, clear it out and attach to key ByteBuffer buffer = bufferPool.poll(); buffer.clear(); buffer.mark(); readKey.attach(buffer); } else if (key.isReadable()) { + // Clear out the operations the select is interested in until done reading key.interestOps(0); - + // Create and execute the read handler final SocketChannelHandler handler = new SocketChannelHandler(key, this, syslogParser, syslogEvents, logger); - logger.debug("Accepted incoming connection"); + // and launch the thread executor.execute(handler); } } } - // Add back all idle + // Add back all idle sockets to the select SelectionKey key; while((key = keyQueue.poll()) != null){ key.interestOps(SelectionKey.OP_READ); @@ -478,19 +492,23 @@ public class ListenSyslog extends AbstractSyslogProcessor { @Override public int getPort() { - return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort(); + // Return the port for the key listening for accepts + for(SelectionKey key : selector.keys()){ + if (key.isValid() && key.isAcceptable()) { + return ((SocketChannel)key.channel()).socket().getLocalPort(); + } + } + return 0; } @Override public void stop() { - selector.wakeup(); - stopped = true; + selector.wakeup(); } @Override public void close() { - IOUtils.closeQuietly(serverSocketChannel); executor.shutdown(); try { // Wait a while for existing tasks to terminate @@ -503,10 +521,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { // Preserve interrupt status Thread.currentThread().interrupt(); } + for(SelectionKey key : selector.keys()){ + IOUtils.closeQuietly(key.channel()); + } + IOUtils.closeQuietly(selector); } public void completeConnection(SelectionKey key) { + // connection is done. Return the buffer to the pool bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + currentConnections.decrementAndGet(); } public void addBackForSelection(SelectionKey key) { @@ -543,53 +567,69 @@ public class ListenSyslog extends AbstractSyslogProcessor { boolean eof = false; SocketChannel socketChannel = null; ByteBuffer socketBuffer = null; + try { int bytesRead; socketChannel = (SocketChannel) key.channel(); socketBuffer = (ByteBuffer) key.attachment(); // read until the buffer is full - while((bytesRead = socketChannel.read(socketBuffer)) > 0){ + while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { + // prepare byte buffer for reading socketBuffer.flip(); + // mark the current position as start, in case of partial message read socketBuffer.mark(); + + // get total bytes in buffer int total = socketBuffer.remaining(); // go through the buffer looking for the end of each message + currBytes.reset(); for (int i = 0; i < total; i++) { + // NOTE: For higher throughput, the looking for \n and copying into the byte + // stream could be improved + // Pull data out of buffer and cram into byte array byte currByte = socketBuffer.get(); currBytes.write(currByte); - // at the end of a message so parse an event, reset the buffer, and break out of the loop + + // check if at end of a message if (currByte == '\n') { + // parse an event, reset the buffer final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), socketChannel.socket().getInetAddress().toString()); logger.trace(event.getFullMessage()); syslogEvents.put(event); // block until space is available currBytes.reset(); + // Mark this as the start of the next message socketBuffer.mark(); } } + // Preserve bytes in buffer for next call to run + // NOTE: This code could benefit from the two ByteBuffer read calls to avoid + // this compact for higher throughput socketBuffer.reset(); socketBuffer.compact(); logger.debug("done handling SocketChannel"); } + // Check for closed socket if( bytesRead < 0 ){ eof = true; } } catch (ClosedByInterruptException | InterruptedException e) { logger.debug("read loop interrupted, closing connection"); + // Treat same as closed socket eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); + // Treat same as closed socket eof = true; } finally { - if(eof == true){ - dispatcher.completeConnection(key); + if(eof == true) { IOUtils.closeQuietly(socketChannel); - } - else { + dispatcher.completeConnection(key); + } else { dispatcher.addBackForSelection(key); } } } - } static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) {
