http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java new file mode 100644 index 0000000..20102ba --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response.socket; + +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.io.IOException; +import java.nio.channels.SocketChannel; + +/** + * A ChannelResponder for SSLSocketChannels. + */ +public class SSLSocketChannelResponder extends SocketChannelResponder { + + private SSLSocketChannel sslSocketChannel; + + public SSLSocketChannelResponder(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel) { + super(socketChannel); + this.sslSocketChannel = sslSocketChannel; + } + + @Override + public void respond() throws IOException { + for (final ChannelResponse response : responses) { + sslSocketChannel.write(response.toByteArray()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java new file mode 100644 index 0000000..5c20bf0 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.response.socket; + +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A ChannelResponder for SocketChannels. The SocketChannel should first be registered with a selector, + * upon being selected for writing the respond() method should be executed. + */ +public class SocketChannelResponder implements ChannelResponder<SocketChannel> { + + protected final List<ChannelResponse> responses; + protected final SocketChannel socketChannel; + + public SocketChannelResponder(final SocketChannel socketChannel) { + this.responses = new ArrayList<>(); + this.socketChannel = socketChannel; + } + + @Override + public SocketChannel getChannel() { + return socketChannel; + } + + @Override + public List<ChannelResponse> getResponses() { + return Collections.unmodifiableList(responses); + } + + @Override + public void addResponse(ChannelResponse response) { + this.responses.add(response); + } + + @Override + public void respond() throws IOException { + for (final ChannelResponse response : responses) { + final ByteBuffer responseBuffer = ByteBuffer.wrap(response.toByteArray()); + + while (responseBuffer.hasRemaining()) { + socketChannel.write(responseBuffer); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java index 70d3f22..e51ba6c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.util.StandardValidators; @@ -32,7 +31,7 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor { public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor .Builder().name("Protocol") - .description("The protocol for Syslog communication, either TCP or UDP.") + .description("The protocol for Syslog communication.") .required(true) .allowableValues(TCP_VALUE, UDP_VALUE) .defaultValue(UDP_VALUE.getValue()) @@ -45,39 +44,11 @@ public abstract class AbstractSyslogProcessor extends AbstractProcessor { .build(); public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() .name("Character Set") - .description("Specifies which character set of the Syslog messages") + .description("Specifies the character set of the Syslog messages") .required(true) .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); - /** - * FlowFile Attributes for each Syslog message. - */ - public enum SyslogAttributes implements FlowFileAttributeKey { - PRIORITY("syslog.priority"), - SEVERITY("syslog.severity"), - FACILITY("syslog.facility"), - VERSION("syslog.version"), - TIMESTAMP("syslog.timestamp"), - HOSTNAME("syslog.hostname"), - SENDER("syslog.sender"), - BODY("syslog.body"), - VALID("syslog.valid"), - PROTOCOL("syslog.protocol"), - PORT("syslog.port"); - - private String key; - - SyslogAttributes(String key) { - this.key = key; - } - - @Override - public String key() { - return key; - } - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java index a4317dc..ecc05ee 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java @@ -125,7 +125,7 @@ public class HandleHttpResponse extends AbstractProcessor { final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue(); if (!isNumber(statusCodeValue)) { session.transfer(flowFile, REL_FAILURE); - getLogger().error("Failed to response to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue}); + getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue}); } final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java new file mode 100644 index 0000000..99e1830 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.standard.relp.event.RELPEvent; +import org.apache.nifi.processors.standard.relp.event.RELPEventFactory; +import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; +import org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory; +import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse; +import org.apache.nifi.processors.standard.relp.response.RELPResponse; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"listen", "relp", "tcp", "logs"}) +@CapabilityDescription("Listens for RELP messages being sent to a given port over TCP. Each message will be " + + "acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " + + "portion of one or more RELP frames. In the case where the RELP frames contain syslog messages, the " + + "output of this processor can be sent to a ParseSyslog processor for further processing.") +@WritesAttributes({ + @WritesAttribute(attribute="relp.command", description="The command of the RELP frames."), + @WritesAttribute(attribute="relp.sender", description="The sending host of the messages."), + @WritesAttribute(attribute="relp.port", description="The sending port the messages were received over."), + @WritesAttribute(attribute="relp.txnr", description="The transaction number of the message. Only included if <Batch Size> is 1."), + @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain") + }) +@SeeAlso({ParseSyslog.class}) +public class ListenRELP extends AbstractListenEventProcessor<RELPEvent> { + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be received over a secure connection.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + private volatile RELPEncoder relpEncoder; + private volatile byte[] messageDemarcatorBytes; //it is only the array reference that is volatile - not the contents. + + @Override + protected List<PropertyDescriptor> getAdditionalProperties() { + return Arrays.asList(MAX_CONNECTIONS, MAX_BATCH_SIZE, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + // wanted to ensure charset was already populated here + relpEncoder = new RELPEncoder(charset); + + final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + messageDemarcatorBytes = msgDemarcator.getBytes(charset); + } + + @Override + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException { + final EventFactory<RELPEvent> eventFactory = new RELPEventFactory(); + final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>(); + + final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); + final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); + + // initialize the buffer pool based on max number of connections and the buffer size + final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(maxConnections); + for (int i = 0; i < maxConnections; i++) { + bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + + // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher + SSLContext sslContext = null; + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + } + + // if we decide to support SSL then get the context and pass it in here + return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events, + getLogger(), maxConnections, sslContext, charSet); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); + + // if the size is 0 then there was nothing to process so yield and return + if (batches.size() == 0) { + context.yield(); + return; + } + + for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) { + FlowFile flowFile = entry.getValue().getFlowFile(); + final List<RELPEvent> events = entry.getValue().getEvents(); + + if (flowFile.getSize() == 0L || events.size() == 0) { + session.remove(flowFile); + getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); + continue; + } + + // the sender and command will be the same for all events based on the batch key + final String sender = events.get(0).getSender(); + final String command = events.get(0).getCommand(); + + final int numAttributes = events.size() == 1 ? 5 : 4; + + final Map<String,String> attributes = new HashMap<>(numAttributes); + attributes.put(RELPAttributes.COMMAND.key(), command); + attributes.put(RELPAttributes.SENDER.key(), sender); + attributes.put(RELPAttributes.PORT.key(), String.valueOf(port)); + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + + // if there was only one event then we can pass on the transaction + // NOTE: we could pass on all the transaction ids joined together + if (events.size() == 1) { + attributes.put(RELPAttributes.TXNR.key(), String.valueOf(events.get(0).getTxnr())); + } + flowFile = session.putAllAttributes(flowFile, attributes); + + getLogger().debug("Transferring {} to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + + // create a provenance receive event + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":") + .append(port).toString(); + session.getProvenanceReporter().receive(flowFile, transitUri); + + // commit the session to guarantee the data has been delivered + session.commit(); + + // respond to each event to acknowledge successful receipt + for (final RELPEvent event : events) { + respond(event, RELPResponse.ok(event.getTxnr())); + } + } + } + + @Override + protected String getBatchKey(RELPEvent event) { + return event.getSender() + "_" + event.getCommand(); + } + + protected void respond(final RELPEvent event, final RELPResponse relpResponse) { + final ChannelResponse response = new RELPChannelResponse(relpEncoder, relpResponse); + + final ChannelResponder responder = event.getResponder(); + responder.addResponse(response); + try { + responder.respond(); + } catch (IOException e) { + getLogger().error("Error sending response for transaction {} due to {}", + new Object[] {event.getTxnr(), e.getMessage()}, e); + } + } + + public enum RELPAttributes implements FlowFileAttributeKey { + TXNR("relp.txnr"), + COMMAND("relp.command"), + SENDER("relp.sender"), + PORT("relp.port"); + + private final String key; + + RELPAttributes(String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index e1e0e91..2e439b4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.standard; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -31,7 +30,6 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -40,26 +38,25 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.processors.standard.syslog.SyslogEvent; +import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; import java.io.IOException; import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.StandardSocketOptions; import java.nio.ByteBuffer; -import java.nio.channels.Channel; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; @@ -67,16 +64,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @@ -175,7 +168,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { private Set<Relationship> relationships; private List<PropertyDescriptor> descriptors; - private volatile ChannelReader channelReader; + private volatile ChannelDispatcher channelDispatcher; private volatile SyslogParser parser; private volatile BlockingQueue<ByteBuffer> bufferPool; private volatile BlockingQueue<RawSyslogEvent> syslogEvents = new LinkedBlockingQueue<>(10); @@ -255,11 +248,10 @@ public class ListenSyslog extends AbstractSyslogProcessor { final String protocol = context.getProperty(PROTOCOL).getValue(); final String charSet = context.getProperty(CHARSET).getValue(); final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - final String charsetName = context.getProperty(CHARSET).getValue(); - messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charsetName)); + messageDemarcatorBytes = msgDemarcator.getBytes(Charset.forName(charSet)); final int maxConnections; - if (protocol.equals(UDP_VALUE.getValue())) { + if (UDP_VALUE.getValue().equals(protocol)) { maxConnections = 1; } else { maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); @@ -274,10 +266,10 @@ public class ListenSyslog extends AbstractSyslogProcessor { // create either a UDP or TCP reader and call open() to bind to the given port final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - channelReader = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService); - channelReader.open(port, maxChannelBufferSize); + channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); + channelDispatcher.open(port, maxChannelBufferSize); - final Thread readerThread = new Thread(channelReader); + final Thread readerThread = new Thread(channelDispatcher); readerThread.setName("ListenSyslog [" + getIdentifier() + "]"); readerThread.setDaemon(true); readerThread.start(); @@ -288,12 +280,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { return parser; } - // visible for testing to be overridden and provide a mock ChannelReader if desired - protected ChannelReader createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, - int maxConnections, final SSLContextService sslContextService) - throws IOException { - if (protocol.equals(UDP_VALUE.getValue())) { - return new DatagramChannelReader(bufferPool, syslogEvents, getLogger()); + // visible for testing to be overridden and provide a mock ChannelDispatcher if desired + protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, + final BlockingQueue<RawSyslogEvent> events, final int maxConnections, + final SSLContextService sslContextService, final Charset charset) throws IOException { + + final EventFactory<RawSyslogEvent> eventFactory = new RawSyslogEventFactory(); + + if (UDP_VALUE.getValue().equals(protocol)) { + return new DatagramChannelDispatcher(eventFactory, bufferPool, events, getLogger()); } else { // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; @@ -301,20 +296,21 @@ public class ListenSyslog extends AbstractSyslogProcessor { sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); } - return new SocketChannelDispatcher(bufferPool, syslogEvents, getLogger(), maxConnections, sslContext); + final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); + return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charset); } } // used for testing to access the random port that was selected protected int getPort() { - return channelReader == null ? 0 : channelReader.getPort(); + return channelDispatcher == null ? 0 : channelDispatcher.getPort(); } @OnUnscheduled public void onUnscheduled() { - if (channelReader != null) { - channelReader.stop(); - channelReader.close(); + if (channelDispatcher != null) { + channelDispatcher.stop(); + channelDispatcher.close(); } } @@ -394,7 +390,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { if (shouldParse) { boolean valid = true; try { - event = parser.parseEvent(rawSyslogEvent.getRawMessage(), sender); + event = parser.parseEvent(rawSyslogEvent.getData(), sender); } catch (final ProcessException pe) { getLogger().warn("Failed to parse Syslog event; routing to invalid"); valid = false; @@ -411,7 +407,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } try { - final byte[] rawBytes = rawSyslogEvent.getRawMessage(); + final byte[] rawBytes = rawSyslogEvent.getData(); invalidFlowFile = session.write(invalidFlowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { @@ -449,7 +445,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { try { // write the raw bytes of the message as the FlowFile content - final byte[] rawMessage = (event == null) ? rawSyslogEvent.getRawMessage() : event.getRawMessage(); + final byte[] rawMessage = (event == null) ? rawSyslogEvent.getData() : event.getRawMessage(); flowFile = session.append(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { @@ -495,505 +491,47 @@ public class ListenSyslog extends AbstractSyslogProcessor { } /** - * Reads messages from a channel until told to stop. - */ - private interface ChannelReader extends Runnable { - - void open(int port, int maxBufferSize) throws IOException; - - int getPort(); - - void stop(); - - void close(); - } - - /** - * Reads from the Datagram channel into an available buffer. If data is read then the buffer is queued for - * processing, otherwise the buffer is returned to the buffer pool. - */ - private static class DatagramChannelReader implements ChannelReader { - - private final BlockingQueue<ByteBuffer> bufferPool; - private final BlockingQueue<RawSyslogEvent> syslogEvents; - private final ProcessorLog logger; - private DatagramChannel datagramChannel; - private volatile boolean stopped = false; - private Selector selector; - - public DatagramChannelReader(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) { - this.bufferPool = bufferPool; - this.syslogEvents = syslogEvents; - this.logger = logger; - } - - @Override - public void open(final int port, int maxBufferSize) throws IOException { - datagramChannel = DatagramChannel.open(); - datagramChannel.configureBlocking(false); - if (maxBufferSize > 0) { - datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); - final int actualReceiveBufSize = datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < maxBufferSize) { - logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize); - } - } - datagramChannel.socket().bind(new InetSocketAddress(port)); - selector = Selector.open(); - datagramChannel.register(selector, SelectionKey.OP_READ); - } - - @Override - public void run() { - final ByteBuffer buffer = bufferPool.poll(); - while (!stopped) { - try { - int selected = selector.select(); - if (selected > 0){ - Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()) { - SelectionKey key = selectorKeys.next(); - selectorKeys.remove(); - if (!key.isValid()) { - continue; - } - DatagramChannel channel = (DatagramChannel) key.channel(); - SocketAddress socketAddress; - buffer.clear(); - while (!stopped && (socketAddress = channel.receive(buffer)) != null) { - String sender = ""; - if (socketAddress instanceof InetSocketAddress) { - sender = ((InetSocketAddress) socketAddress).getAddress().toString(); - } - - // create a byte array from the buffer - buffer.flip(); - byte bytes[] = new byte[buffer.limit()]; - buffer.get(bytes, 0, buffer.limit()); - - // queue the raw message with the sender, block until space is available - syslogEvents.put(new RawSyslogEvent(bytes, sender)); - buffer.clear(); - } - } - } - } catch (InterruptedException e) { - stopped = true; - } catch (IOException e) { - logger.error("Error reading from DatagramChannel", e); - } - } - - if (buffer != null) { - try { - bufferPool.put(buffer); - } catch (InterruptedException e) { - // nothing to do here - } - } - } - - @Override - public int getPort() { - return datagramChannel == null ? 0 : datagramChannel.socket().getLocalPort(); - } - - @Override - public void stop() { - selector.wakeup(); - stopped = true; - } - - @Override - public void close() { - IOUtils.closeQuietly(selector); - IOUtils.closeQuietly(datagramChannel); - } - } - - /** - * Accepts Socket connections on the given port and creates a handler for each connection to - * be executed by a thread pool. + * Wrapper class to pass around the raw message and the host/ip that sent it */ - private static class SocketChannelDispatcher implements ChannelReader { - - private final BlockingQueue<ByteBuffer> bufferPool; - private final BlockingQueue<RawSyslogEvent> syslogEvents; - private final ProcessorLog logger; - private final ExecutorService executor; - private volatile boolean stopped = false; - private Selector selector; - private final BlockingQueue<SelectionKey> keyQueue; - private final int maxConnections; - private final AtomicInteger currentConnections = new AtomicInteger(0); - private final SSLContext sslContext; - - public SocketChannelDispatcher(final BlockingQueue<ByteBuffer> bufferPool, final BlockingQueue<RawSyslogEvent> syslogEvents, - final ProcessorLog logger, final int maxConnections, final SSLContext sslContext) { - this.bufferPool = bufferPool; - this.syslogEvents = syslogEvents; - this.logger = logger; - this.maxConnections = maxConnections; - this.keyQueue = new LinkedBlockingQueue<>(maxConnections); - this.sslContext = sslContext; - this.executor = Executors.newFixedThreadPool(maxConnections); - } - - @Override - public void open(final int port, int maxBufferSize) throws IOException { - final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.configureBlocking(false); - if (maxBufferSize > 0) { - serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, maxBufferSize); - final int actualReceiveBufSize = serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < maxBufferSize) { - logMaxBufferWarning(logger, maxBufferSize, actualReceiveBufSize); - } - } - serverSocketChannel.socket().bind(new InetSocketAddress(port)); - selector = Selector.open(); - serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); - } + static class RawSyslogEvent<C extends SelectableChannel> implements Event<C> { - @Override - public void run() { - while (!stopped) { - try { - int selected = selector.select(); - if (selected > 0){ - Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator(); - while (selectorKeys.hasNext()){ - SelectionKey key = selectorKeys.next(); - selectorKeys.remove(); - if (!key.isValid()){ - continue; - } - if (key.isAcceptable()) { - // Handle new connections coming in - final ServerSocketChannel channel = (ServerSocketChannel) key.channel(); - final SocketChannel socketChannel = channel.accept(); - // Check for available connections - if (currentConnections.incrementAndGet() > maxConnections){ - currentConnections.decrementAndGet(); - logger.warn("Rejecting connection from {} because max connections has been met", - new Object[]{ socketChannel.getRemoteAddress().toString() }); - IOUtils.closeQuietly(socketChannel); - continue; - } - logger.debug("Accepted incoming connection from {}", - new Object[]{socketChannel.getRemoteAddress().toString()}); - // Set socket to non-blocking, and register with selector - socketChannel.configureBlocking(false); - SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - - // Prepare the byte buffer for the reads, clear it out - ByteBuffer buffer = bufferPool.poll(); - buffer.clear(); - buffer.mark(); - - // If we have an SSLContext then create an SSLEngine for the channel - SSLEngine sslEngine = null; - if (sslContext != null) { - sslEngine = sslContext.createSSLEngine(); - } - - // Attach the buffer and SSLEngine to the key - SocketChannelAttachment attachment = new SocketChannelAttachment(buffer, sslEngine); - readKey.attach(attachment); - } else if (key.isReadable()) { - // Clear out the operations the select is interested in until done reading - key.interestOps(0); - // Create a handler based on whether an SSLEngine was provided or not - final Runnable handler; - if (sslContext != null) { - handler = new SSLSocketChannelHandler(key, this, syslogEvents, logger); - } else { - handler = new SocketChannelHandler(key, this, syslogEvents, logger); - } - // run the handler - executor.execute(handler); - } - } - } - // Add back all idle sockets to the select - SelectionKey key; - while((key = keyQueue.poll()) != null){ - key.interestOps(SelectionKey.OP_READ); - } - } catch (IOException e) { - logger.error("Error accepting connection from SocketChannel", e); - } - } - } + final byte[] rawMessage; + final String sender; - @Override - public int getPort() { - // Return the port for the key listening for accepts - for(SelectionKey key : selector.keys()){ - if (key.isValid()) { - final Channel channel = key.channel(); - if (channel instanceof ServerSocketChannel) { - return ((ServerSocketChannel)channel).socket().getLocalPort(); - } - } - } - return 0; + public RawSyslogEvent(final byte[] rawMessage, final String sender) { + this.rawMessage = rawMessage; + this.sender = sender; } @Override - public void stop() { - stopped = true; - selector.wakeup(); + public byte[] getData() { + return this.rawMessage; } @Override - public void close() { - executor.shutdown(); - try { - // Wait a while for existing tasks to terminate - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - executor.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - for(SelectionKey key : selector.keys()){ - IOUtils.closeQuietly(key.channel()); - } - IOUtils.closeQuietly(selector); - } - - public void completeConnection(SelectionKey key) { - // connection is done. Return the buffer to the pool - try { - SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - bufferPool.put(attachment.getByteBuffer()); - } catch (InterruptedException e) { - // nothing to do here - } - currentConnections.decrementAndGet(); - } - - public void addBackForSelection(SelectionKey key) { - keyQueue.offer(key); - selector.wakeup(); - } - - } - - /** - * Reads from the given SocketChannel into the provided buffer. If data is read then the buffer is queued for - * processing, otherwise the buffer is returned to the buffer pool. - */ - private static class SocketChannelHandler implements Runnable { - - private final SelectionKey key; - private final SocketChannelDispatcher dispatcher; - private final BlockingQueue<RawSyslogEvent> syslogEvents; - private final ProcessorLog logger; - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public SocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) { - this.key = key; - this.dispatcher = dispatcher; - this.syslogEvents = syslogEvents; - this.logger = logger; + public String getSender() { + return this.sender; } @Override - public void run() { - boolean eof = false; - SocketChannel socketChannel = null; - - try { - int bytesRead; - socketChannel = (SocketChannel) key.channel(); - - SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - ByteBuffer socketBuffer = attachment.getByteBuffer(); - - // read until the buffer is full - while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { - // prepare byte buffer for reading - socketBuffer.flip(); - // mark the current position as start, in case of partial message read - socketBuffer.mark(); - - // get total bytes in buffer - int total = socketBuffer.remaining(); - // go through the buffer looking for the end of each message - currBytes.reset(); - for (int i = 0; i < total; i++) { - // NOTE: For higher throughput, the looking for \n and copying into the byte - // stream could be improved - // Pull data out of buffer and cram into byte array - byte currByte = socketBuffer.get(); - currBytes.write(currByte); - - // check if at end of a message - if (currByte == '\n') { - String sender = socketChannel.socket().getInetAddress().toString(); - // queue the raw event blocking until space is available, reset the buffer - syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender)); - currBytes.reset(); - // Mark this as the start of the next message - socketBuffer.mark(); - } - } - // Preserve bytes in buffer for next call to run - // NOTE: This code could benefit from the two ByteBuffer read calls to avoid - // this compact for higher throughput - socketBuffer.reset(); - socketBuffer.compact(); - logger.debug("done handling SocketChannel"); - } - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(socketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } - } + public ChannelResponder getResponder() { + return null; } } /** - * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over TLS. + * EventFactory implementation for RawSyslogEvent. */ - private static class SSLSocketChannelHandler implements Runnable { - - private final SelectionKey key; - private final SocketChannelDispatcher dispatcher; - private final BlockingQueue<RawSyslogEvent> syslogEvents; - private final ProcessorLog logger; - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public SSLSocketChannelHandler(final SelectionKey key, final SocketChannelDispatcher dispatcher, final BlockingQueue<RawSyslogEvent> syslogEvents, final ProcessorLog logger) { - this.key = key; - this.dispatcher = dispatcher; - this.syslogEvents = syslogEvents; - this.logger = logger; - } + private static class RawSyslogEventFactory implements EventFactory<RawSyslogEvent> { @Override - public void run() { - boolean eof = false; - SSLSocketChannel sslSocketChannel = null; - try { - int bytesRead; - final SocketChannel socketChannel = (SocketChannel) key.channel(); - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - - // wrap the SocketChannel with an SSLSocketChannel using the SSLEngine from the attachment - sslSocketChannel = new SSLSocketChannel(attachment.getSslEngine(), socketChannel, false); - - // SSLSocketChannel deals with byte[] so ByteBuffer isn't used here, but we'll use the size to create a new byte[] - final ByteBuffer socketBuffer = attachment.getByteBuffer(); - byte[] socketBufferArray = new byte[socketBuffer.limit()]; - - // read until no more data - while ((bytesRead = sslSocketChannel.read(socketBufferArray)) > 0) { - // go through the buffer looking for the end of each message - for (int i = 0; i < bytesRead; i++) { - final byte currByte = socketBufferArray[i]; - currBytes.write(currByte); - - // check if at end of a message - if (currByte == '\n') { - final String sender = socketChannel.socket().getInetAddress().toString(); - // queue the raw event blocking until space is available, reset the temporary buffer - syslogEvents.put(new RawSyslogEvent(currBytes.toByteArray(), sender)); - currBytes.reset(); - } - } - logger.debug("done handling SocketChannel"); - } - - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(sslSocketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } + public RawSyslogEvent create(byte[] data, Map<String, String> metadata, final ChannelResponder responder) { + String sender = null; + if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) { + sender = metadata.get(EventFactory.SENDER_KEY); } + return new RawSyslogEvent(data, sender); } } - static void logMaxBufferWarning(final ProcessorLog logger, int maxBufferSize, int actualReceiveBufSize) { - logger.warn("Attempted to set Socket Buffer Size to " + maxBufferSize + " bytes but could only set to " - + actualReceiveBufSize + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - - // Wrapper class to pass around the raw message and the host/ip that sent it - static class RawSyslogEvent { - - final byte[] rawMessage; - final String sender; - - public RawSyslogEvent(byte[] rawMessage, String sender) { - this.rawMessage = rawMessage; - this.sender = sender; - } - - public byte[] getRawMessage() { - return this.rawMessage; - } - - public String getSender() { - return this.sender; - } - - } - - // Wrapper class so we can attach a buffer and/or an SSLEngine to the selector key - private static class SocketChannelAttachment { - - private final ByteBuffer byteBuffer; - private final SSLEngine sslEngine; - - public SocketChannelAttachment(ByteBuffer byteBuffer, SSLEngine sslEngine) { - this.byteBuffer = byteBuffer; - this.sslEngine = sslEngine; - } - - public ByteBuffer getByteBuffer() { - return byteBuffer; - } - - public SSLEngine getSslEngine() { - return sslEngine; - } - - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java index 1490cc2..90fa816 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog.java @@ -46,9 +46,9 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.processors.standard.syslog.SyslogEvent; +import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.stream.io.StreamUtils; http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java index 2089956..e555a0c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java @@ -35,7 +35,7 @@ import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.ObjectHolder; http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java new file mode 100644 index 0000000..e877ea2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.event; + +import org.apache.nifi.processor.util.listen.event.StandardEvent; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.nio.channels.SocketChannel; + +/** + * A RELP event which adds the transaction number and command to the StandardEvent. + */ +public class RELPEvent extends StandardEvent<SocketChannel> { + + private final long txnr; + private final String command; + + public RELPEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final long txnr, final String command) { + super(sender, data, responder); + this.txnr = txnr; + this.command = command; + } + + public long getTxnr() { + return txnr; + } + + public String getCommand() { + return command; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java new file mode 100644 index 0000000..22eba01 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.event; + +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; + +import java.util.Map; + +/** + * An EventFactory implementation to create RELPEvents. + */ +public class RELPEventFactory implements EventFactory<RELPEvent> { + + @Override + public RELPEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { + final long txnr = Long.valueOf(metadata.get(RELPMetadata.TXNR_KEY)); + final String command = metadata.get(RELPMetadata.COMMAND_KEY); + final String sender = metadata.get(EventFactory.SENDER_KEY); + return new RELPEvent(sender, data, responder, txnr, command); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java new file mode 100644 index 0000000..88051c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.event; + +/** + * Metadata keys for RELP. + */ +public interface RELPMetadata { + + String TXNR_KEY = "relp.txnr"; + String COMMAND_KEY = "relp.command"; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java new file mode 100644 index 0000000..cc7fa28 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; + +/** + * Decodes a RELP frame by maintaining a state based on each byte that has been processed. This class + * should not be shared by multiple threads. + */ +public class RELPDecoder { + + static final Logger logger = LoggerFactory.getLogger(RELPDecoder.class); + + private RELPFrame.Builder frameBuilder; + private RELPState currState = RELPState.TXNR; + + private final Charset charset; + private final ByteArrayOutputStream currBytes; + + /** + * @param charset the charset to decode bytes from the RELP frame + */ + public RELPDecoder(final Charset charset) { + this(charset, new ByteArrayOutputStream(4096)); + } + + /** + * + * @param charset the charset to decode bytes from the RELP frame + * @param buffer a buffer to use while processing the bytes + */ + public RELPDecoder(final Charset charset, final ByteArrayOutputStream buffer) { + this.charset = charset; + this.currBytes = buffer; + this.frameBuilder = new RELPFrame.Builder(); + } + + /** + * Resets this decoder back to it's initial state. + */ + public void reset() { + frameBuilder = new RELPFrame.Builder(); + currState = RELPState.TXNR; + currBytes.reset(); + } + + /** + * Process the next byte from the channel, updating the builder and state accordingly. + * + * @param currByte the next byte to process + * @preturn true if a frame is ready to be retrieved, false otherwise + */ + public boolean process(final byte currByte) throws RELPFrameException { + try { + switch (currState) { + case TXNR: + processTXNR(currByte); + break; + case COMMAND: + processCOMMAND(currByte); + break; + case LENGTH: + processLENGTH(currByte); + // if jumped from length to trailer we need to return true here + // because there might not be another byte to process + if (currState == RELPState.TRAILER) { + return true; + } + break; + case DATA: + processDATA(currByte); + break; + case TRAILER: + return true; + default: + break; + } + return false; + } catch (Exception e) { + throw new RELPFrameException("Error decoding RELP frame: " + e.getMessage(), e); + } + } + + /** + * Returns the decoded frame and resets the decoder for the next frame. + * This method should be called after checking isComplete(). + * + * @return the RELPFrame that was decoded + */ + public RELPFrame getFrame() throws RELPFrameException { + if (currState != RELPState.TRAILER) { + throw new RELPFrameException("Must be at the trailer of a frame"); + } + + try { + final RELPFrame frame = frameBuilder.build(); + processTRAILER(RELPFrame.DELIMITER); + return frame; + } catch (Exception e) { + throw new RELPFrameException("Error decoding RELP frame: " + e.getMessage(), e); + } + } + + + private void processTXNR(final byte b) { + if (b == RELPFrame.SEPARATOR) { + if (currBytes.size() > 0) { + final long txnr = Long.parseLong(new String(currBytes.toByteArray(), charset)); + frameBuilder.txnr(txnr); + logger.debug("Transaction number is {}", new Object[]{txnr}); + + currBytes.reset(); + currState = RELPState.COMMAND; + } + } else { + currBytes.write(b); + } + } + + private void processCOMMAND(final byte b) { + if (b == RELPFrame.SEPARATOR) { + final String command = new String(currBytes.toByteArray(), charset); + frameBuilder.command(command); + logger.debug("Command is {}", new Object[] {command}); + + currBytes.reset(); + currState = RELPState.LENGTH; + } else { + currBytes.write(b); + } + } + + private void processLENGTH(final byte b) { + if (b == RELPFrame.SEPARATOR || (currBytes.size() > 0 && b == RELPFrame.DELIMITER)) { + final int dataLength = Integer.parseInt(new String(currBytes.toByteArray(), charset)); + frameBuilder.dataLength(dataLength); + logger.debug("Length is {}", new Object[] {dataLength}); + + currBytes.reset(); + + // if at a separator then data is going to follow, but if at a separator there is no data + if (b == RELPFrame.SEPARATOR) { + currState = RELPState.DATA; + } else { + frameBuilder.data(new byte[0]); + currState = RELPState.TRAILER; + } + } else { + currBytes.write(b); + } + } + + private void processDATA(final byte b) { + currBytes.write(b); + logger.trace("Data size is {}", new Object[] {currBytes.size()}); + + if (currBytes.size() >= frameBuilder.dataLength) { + final byte[] data = currBytes.toByteArray(); + frameBuilder.data(data); + logger.debug("Reached expected data size of {}", new Object[] {frameBuilder.dataLength}); + + currBytes.reset(); + currState = RELPState.TRAILER; + } + } + + private void processTRAILER(final byte b) { + if (b != RELPFrame.DELIMITER) { + logger.warn("Expected RELP trailing LF, but found another byte"); + } + currBytes.reset(); + frameBuilder.reset(); + currState = RELPState.TXNR; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java new file mode 100644 index 0000000..a36588a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPEncoder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; + +/** + * Encodes a RELPFrame into raw bytes using the given charset. + */ +public class RELPEncoder { + + private final Charset charset; + + public RELPEncoder(final Charset charset) { + this.charset = charset; + } + + public Charset getCharset() { + return charset; + } + + public byte[] encode(final RELPFrame frame) { + final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + // write transaction number followed by separator + byte[] txnr = String.format("%s", frame.getTxnr()).getBytes(charset); + buffer.write(txnr, 0, txnr.length); + buffer.write(RELPFrame.SEPARATOR); + + // write the command followed by separator + byte[] command = frame.getCommand().getBytes(charset); + buffer.write(command, 0, command.length); + buffer.write(RELPFrame.SEPARATOR); + + // write the data length + byte[] dataLength = String.format("%s", frame.getDataLength()).getBytes(charset); + buffer.write(dataLength, 0, dataLength.length); + + // if data to write then put a separator and write the data + if (frame.getDataLength() > 0) { + buffer.write(RELPFrame.SEPARATOR); + buffer.write(frame.getData(), 0, frame.getDataLength()); + } + + // write the end of the frame + buffer.write(RELPFrame.DELIMITER); + + return buffer.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java new file mode 100644 index 0000000..d763dda --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrame.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +import org.apache.commons.lang3.StringUtils; + +/** + * A RELP frame received from a channel. + */ +public class RELPFrame { + + public static final byte DELIMITER = 10; + public static final byte SEPARATOR = 32; + + private final long txnr; + private final int dataLength; + private final String command; + private final byte[] data; + + private RELPFrame(final Builder builder) { + this.txnr = builder.txnr; + this.dataLength = builder.dataLength; + this.command = builder.command; + this.data = builder.data == null ? new byte[0] : builder.data; + + if (txnr < 0 || dataLength < 0 || command == null || StringUtils.isBlank(command) + || data == null || dataLength != data.length) { + throw new RELPFrameException("Invalid Frame"); + } + } + + public long getTxnr() { + return txnr; + } + + public int getDataLength() { + return dataLength; + } + + public String getCommand() { + return command; + } + + //NOTE: consider making a copy here if we want to be truly be immutable + public byte[] getData() { + return data; + } + + + /** + * Builder for a RELPFrame. + */ + public static class Builder { + + long txnr; + int dataLength; + String command; + byte[] data; + + public Builder() { + reset(); + } + + public void reset() { + txnr = -1; + dataLength = -1; + command = null; + data = null; + } + + public Builder txnr(final long txnr) { + this.txnr = txnr; + return this; + } + + public Builder dataLength(final int dataLength) { + this.dataLength = dataLength; + return this; + } + + public Builder command(final String command) { + this.command = command; + return this; + } + + public Builder data(final byte[] data) { + this.data = data; + return this; + } + + public RELPFrame build() { + return new RELPFrame(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java new file mode 100644 index 0000000..dab01e5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +/** + * Represents an error encountered when decoding RELP frames. + */ +public class RELPFrameException extends RuntimeException { + + public RELPFrameException(String message) { + super(message); + } + + public RELPFrameException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java new file mode 100644 index 0000000..aabecfb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.frame; + +/** + * The parts of a RELP frame. + */ +public enum RELPState { + + TXNR, + COMMAND, + LENGTH, + DATA, + TRAILER + +}
