merge of master and includes tcp and udp socket changes - requires rebase before merging into master
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d759fec Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d759fec Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d759fec Branch: refs/heads/NIFI-274 Commit: 3d759feca529d09f940ffada511a88d689c33d97 Parents: 7f58b2a 664bda8 Author: Tony Kurc <[email protected]> Authored: Mon Nov 2 15:28:26 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Mon Nov 2 15:28:47 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/nifi/action/Action.java | 50 ++ .../java/org/apache/nifi/action/Component.java | 34 + .../java/org/apache/nifi/action/Operation.java | 37 + .../component/details/ComponentDetails.java | 26 + .../component/details/ExtensionDetails.java | 26 + .../details/RemoteProcessGroupDetails.java | 26 + .../nifi/action/details/ActionDetails.java | 26 + .../nifi/action/details/ConfigureDetails.java | 30 + .../nifi/action/details/ConnectDetails.java | 40 + .../apache/nifi/action/details/MoveDetails.java | 30 + .../nifi/action/details/PurgeDetails.java | 28 + .../apache/nifi/processor/ProcessSession.java | 27 + .../nifi/provenance/ProvenanceEventType.java | 31 +- .../nifi/provenance/ProvenanceReporter.java | 37 + .../org/apache/nifi/reporting/EventAccess.java | 13 + .../client/socket/EndpointConnectionPool.java | 3 +- .../src/main/asciidoc/getting-started.adoc | 754 +++++++++++++++++ .../src/main/asciidoc/images/add-processor.png | Bin 31524 -> 92164 bytes nifi-docs/src/main/asciidoc/user-guide.adoc | 2 +- .../org/apache/nifi/util/MockEventAccess.java | 27 + .../apache/nifi/util/MockProcessSession.java | 8 + .../nifi/util/MockProvenanceReporter.java | 35 +- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../org/apache/nifi/admin/dao/ActionDAO.java | 12 +- .../java/org/apache/nifi/admin/dao/UserDAO.java | 3 +- .../nifi/admin/dao/impl/StandardActionDAO.java | 93 ++- .../nifi/admin/dao/impl/StandardUserDAO.java | 4 +- .../apache/nifi/admin/service/AuditService.java | 18 +- .../admin/service/action/AddActionsAction.java | 3 +- .../service/action/PurgeActionsAction.java | 3 +- .../service/impl/StandardAuditService.java | 24 +- .../resources/nifi-administration-context.xml | 2 +- .../manager/impl/ClusteredEventAccess.java | 21 +- .../cluster/manager/impl/WebClusterManager.java | 2 +- .../apache/nifi/controller/FlowController.java | 158 ++-- .../repository/BatchingSessionFactory.java | 5 + .../repository/FileSystemRepository.java | 10 +- .../repository/StandardProcessSession.java | 18 +- .../repository/StandardProvenanceReporter.java | 31 +- .../nifi/processor/SimpleProcessLogger.java | 20 +- .../nifi/spring/FlowControllerFactoryBean.java | 8 + .../src/main/resources/nifi-context.xml | 1 + .../controller/StandardFlowServiceTest.java | 5 +- .../repository/TestStandardProcessSession.java | 39 +- .../nifi/processor/TestSimpleProcessLogger.java | 101 +++ .../nifi-framework/nifi-user-actions/pom.xml | 7 + .../java/org/apache/nifi/action/Action.java | 121 --- .../java/org/apache/nifi/action/Component.java | 34 - .../apache/nifi/action/FlowChangeAction.java | 130 +++ .../java/org/apache/nifi/action/Operation.java | 37 - .../component/details/ComponentDetails.java | 26 - .../component/details/ExtensionDetails.java | 34 - .../details/FlowChangeExtensionDetails.java | 35 + .../FlowChangeRemoteProcessGroupDetails.java | 35 + .../details/RemoteProcessGroupDetails.java | 34 - .../nifi/action/details/ActionDetails.java | 26 - .../nifi/action/details/ConfigureDetails.java | 52 -- .../nifi/action/details/ConnectDetails.java | 90 -- .../details/FlowChangeConfigureDetails.java | 55 ++ .../details/FlowChangeConnectDetails.java | 97 +++ .../action/details/FlowChangeMoveDetails.java | 65 ++ .../action/details/FlowChangePurgeDetails.java | 46 + .../apache/nifi/action/details/MoveDetails.java | 61 -- .../nifi/action/details/PurgeDetails.java | 45 - .../apache/nifi/audit/ControllerAuditor.java | 27 +- .../nifi/audit/ControllerServiceAuditor.java | 43 +- .../org/apache/nifi/audit/FunnelAuditor.java | 7 +- .../java/org/apache/nifi/audit/NiFiAuditor.java | 25 +- .../java/org/apache/nifi/audit/PortAuditor.java | 27 +- .../apache/nifi/audit/ProcessGroupAuditor.java | 25 +- .../org/apache/nifi/audit/ProcessorAuditor.java | 25 +- .../apache/nifi/audit/RelationshipAuditor.java | 18 +- .../nifi/audit/RemoteProcessGroupAuditor.java | 35 +- .../apache/nifi/audit/ReportingTaskAuditor.java | 25 +- .../org/apache/nifi/audit/SnippetAuditor.java | 26 +- .../nifi/web/StandardNiFiServiceFacade.java | 9 +- .../StandardNiFiWebConfigurationContext.java | 13 +- .../apache/nifi/web/StandardNiFiWebContext.java | 13 +- .../nifi/web/api/ApplicationResource.java | 5 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 20 +- .../nifi/processors/hadoop/FetchHDFS.java | 2 +- .../apache/nifi/processors/kafka/PutKafka.java | 6 +- .../nifi-pcap-processors/.gitignore | 1 - .../nifi-standard-processors/pom.xml | 5 +- .../standard/AbstractSyslogProcessor.java | 5 +- .../processors/standard/AttributesToJSON.java | 242 ++++++ .../processors/standard/FetchFileTransfer.java | 3 +- .../nifi/processors/standard/InvokeHTTP.java | 198 +++-- .../nifi/processors/standard/ListenSyslog.java | 32 +- .../nifi/processors/standard/MergeContent.java | 6 +- .../apache/nifi/processors/standard/PutSQL.java | 156 ++-- .../nifi/processors/standard/ReplaceText.java | 431 +++++++--- .../processors/standard/util/SyslogEvent.java | 13 + .../processors/standard/util/SyslogParser.java | 18 +- .../org.apache.nifi.processor.Processor | 1 + .../standard/TestAttributesToJSON.java | 282 +++++++ .../processors/standard/TestInvokeHTTP.java | 607 +------------- .../processors/standard/TestInvokeHttpSSL.java | 90 ++ .../nifi/processors/standard/TestPutSQL.java | 46 +- .../processors/standard/TestReplaceText.java | 635 +++++++++++++- .../standard/TestReplaceTextLineByLine.java | 336 -------- .../standard/util/TestInvokeHttpCommon.java | 830 +++++++++++++++++++ .../standard/util/TestSyslogParser.java | 15 + .../AppendLineByLineTest.txt | 11 + .../PrependLineByLineTest.txt | 11 + 105 files changed, 5092 insertions(+), 2136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 457ec5d,9f57c9f..3105b1f --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@@ -16,6 -16,30 +16,7 @@@ */ package org.apache.nifi.processors.standard; -import org.apache.commons.io.IOUtils; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.io.nio.BufferPool; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; -import org.apache.nifi.stream.io.ByteArrayOutputStream; + import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@@ -44,31 -64,6 +45,33 @@@ import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.SyslogEvent; +import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.stream.io.ByteArrayOutputStream; ++ +import org.apache.nifi.util.file.FileUtils; + ++ @Tags({"syslog", "listen", "udp", "tcp", "logs"}) @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " + "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " + @@@ -108,14 -104,8 +112,13 @@@ public class ListenSyslog extends Abstr .defaultValue("1 MB") .required(true) .build(); - - + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max number of TCP connections") - .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") - .addValidator(StandardValidators.createLongValidator(1, 65535, true)) - .defaultValue("2") - .required(true) - .build(); - ++ .name("Max number of TCP connections") ++ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode") ++ .addValidator(StandardValidators.createLongValidator(1, 65535, true)) ++ .defaultValue("2") ++ .required(true) ++ .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.") @@@ -178,17 -168,9 +181,17 @@@ final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); + final int maxConnections; + + if (protocol.equals(UDP_VALUE)) { + maxConnections = 1; - } - else{ ++ } else { + maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); + } - + parser = new SyslogParser(Charset.forName(charSet)); - bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE); + bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE); ++ syslogEvents = new LinkedBlockingQueue<>(10); errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); @@@ -332,37 -312,33 +336,38 @@@ @Override public void run() { + final ByteBuffer buffer = bufferPool.poll(); while (!stopped) { - final ByteBuffer buffer = bufferPool.poll(); try { - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } - - final SocketAddress sender = datagramChannel.receive(buffer); - if (sender == null) { - Thread.sleep(1000L); // nothing to do so wait... - } else { - final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender? - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available + int selected = selector.select(); + if (selected > 0){ + Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); + while (selectorKeys.hasNext()){ + SelectionKey key = selectorKeys.next(); + selectorKeys.remove(); + if (!key.isValid()){ + continue; + } + DatagramChannel channel = (DatagramChannel) key.channel(); + SocketAddress sender; + buffer.clear(); + while (!stopped && (sender = channel.receive(buffer)) != null) { ++ // TODO: Add sender address + final SyslogEvent event = syslogParser.parseEvent(buffer); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + } + } } } catch (InterruptedException e) { - stop(); + stopped = true; } catch (IOException e) { logger.error("Error reading from DatagramChannel", e); - } finally { - if (buffer != null) { - bufferPool.returnBuffer(buffer, 0); - } } } + if (buffer != null) { + bufferPool.returnBuffer(buffer, 0); + } } @Override @@@ -482,8 -421,6 +487,7 @@@ @Override public void stop() { + selector.wakeup(); - stopped = true; } @@@ -504,15 -441,6 +508,14 @@@ } } + public void completeConnection(SelectionKey key) { + bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0); + } + + public void addBackForSelection(SelectionKey key) { + keyQueue.offer(key); + selector.wakeup(); + } - } /** @@@ -520,9 -448,9 +523,8 @@@ * processing, otherwise the buffer is returned to the buffer pool. */ public static class SocketChannelHandler implements Runnable { -- - private final BufferPool bufferPool; - private final SocketChannel socketChannel; + private final SelectionKey key; + private final SocketChannelReader dispatcher; private final SyslogParser syslogParser; private final BlockingQueue<SyslogEvent> syslogEvents; private final ProcessorLog logger; @@@ -539,52 -467,52 +541,54 @@@ @Override public void run() { - try { - int bytesRead = 0; - while (bytesRead >= 0 && !Thread.interrupted()) { - - final ByteBuffer buffer = bufferPool.poll(); - if (buffer == null) { - Thread.sleep(10L); - logger.debug("no available buffers, continuing..."); - continue; - } + - try { - // read until the buffer is full - bytesRead = socketChannel.read(buffer); - while (bytesRead > 0) { - bytesRead = socketChannel.read(buffer); - } - buffer.flip(); - - // go through the buffer looking for the end of each message - int bufferLength = buffer.limit(); - for (int i = 0; i < bufferLength; i++) { - byte currByte = buffer.get(i); - currBytes.write(currByte); - - // at the end of a message so parse an event, reset the buffer, and break out of the loop - if (currByte == '\n') { - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), - socketChannel.socket().getInetAddress().toString()); - logger.trace(event.getFullMessage()); - syslogEvents.put(event); // block until space is available - currBytes.reset(); - } + boolean eof = false; + SocketChannel socketChannel = null; + ByteBuffer socketBuffer = null; + try { + int bytesRead; + socketChannel = (SocketChannel) key.channel(); + socketBuffer = (ByteBuffer) key.attachment(); + // read until the buffer is full + while((bytesRead = socketChannel.read(socketBuffer)) > 0){ + socketBuffer.flip(); + socketBuffer.mark(); + int total = socketBuffer.remaining(); + // go through the buffer looking for the end of each message + for (int i = 0; i < total; i++) { + byte currByte = socketBuffer.get(); + currBytes.write(currByte); + // at the end of a message so parse an event, reset the buffer, and break out of the loop + if (currByte == '\n') { - final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray()); ++ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(), ++ socketChannel.socket().getInetAddress().toString()); + logger.trace(event.getFullMessage()); + syslogEvents.put(event); // block until space is available + currBytes.reset(); + socketBuffer.mark(); } - } finally { - bufferPool.returnBuffer(buffer, 0); } + socketBuffer.reset(); + socketBuffer.compact(); + logger.debug("done handling SocketChannel"); + } + if( bytesRead < 0 ){ + eof = true; } - - logger.debug("done handling SocketChannel"); } catch (ClosedByInterruptException | InterruptedException e) { - // nothing to do here + logger.debug("read loop interrupted, closing connection"); + eof = true; } catch (IOException e) { logger.error("Error reading from channel", e); + eof = true; } finally { - IOUtils.closeQuietly(socketChannel); + if(eof == true){ + dispatcher.completeConnection(key); + IOUtils.closeQuietly(socketChannel); + } + else { + dispatcher.addBackForSelection(key); + } } }
