[
https://issues.apache.org/jira/browse/NIFI-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167241#comment-15167241
]
ASF GitHub Bot commented on NIFI-1420:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/233#discussion_r54099600
--- Diff:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/ListenSplunkForwarder.java
---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import
org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
+import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.StandardEvent;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import
org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"listen", "splunk", "tcp", "udp", "logs"})
+@CapabilityDescription("Listens for data from a Splunk forwarder.")
+@WritesAttributes({
+ @WritesAttribute(attribute="splunk.sender", description="The
sending host of the messages."),
+ @WritesAttribute(attribute="splunk.port", description="The sending
port the messages were received over."),
+ @WritesAttribute(attribute="mime.type", description="The mime.type
of the messages which is text/plain.")
+})
+public class ListenSplunkForwarder extends
AbstractListenEventProcessor<ListenSplunkForwarder.SplunkEvent> {
+
+ public static final AllowableValue TCP_VALUE = new
AllowableValue("TCP", "TCP");
+ public static final AllowableValue UDP_VALUE = new
AllowableValue("UDP", "UDP");
+
+ public static final PropertyDescriptor PROTOCOL = new
PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol for communication.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(TCP_VALUE.getValue())
+ .build();
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The Controller Service to use in order to obtain
an SSL Context. If this property is set, " +
+ "messages will be received over a secure connection.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ // it is only the array reference that is volatile - not the contents.
+ private volatile byte[] messageDemarcatorBytes;
+
+ @Override
+ protected List<PropertyDescriptor> getAdditionalProperties() {
+ return Arrays.asList(
+ PROTOCOL,
+ MAX_CONNECTIONS,
+ MAX_BATCH_SIZE,
+ MESSAGE_DELIMITER,
+ SSL_CONTEXT_SERVICE
+ );
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ super.onScheduled(context);
+ final String msgDemarcator =
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n",
"\n").replace("\\r", "\r").replace("\\t", "\t");
+ messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+ }
+
+ @Override
+ protected ChannelDispatcher createDispatcher(final ProcessContext
context, final BlockingQueue<SplunkEvent> events)
+ throws IOException {
+
+ final String protocol = context.getProperty(PROTOCOL).getValue();
+ final int maxConnections =
context.getProperty(MAX_CONNECTIONS).asInteger();
+ final int bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
+
+ // initialize the buffer pool based on max number of connections
and the buffer size
+ final LinkedBlockingQueue<ByteBuffer> bufferPool = new
LinkedBlockingQueue<>(maxConnections);
+ for (int i = 0; i < maxConnections; i++) {
+ bufferPool.offer(ByteBuffer.allocate(bufferSize));
+ }
+
+ final EventFactory<SplunkEvent> eventFactory = new
SplunkEventFactory();
+
+ if (UDP_VALUE.getValue().equals(protocol)) {
+ return new DatagramChannelDispatcher(eventFactory, bufferPool,
events, getLogger());
+ } else {
+ // if an SSLContextService was provided then create an
SSLContext to pass down to the dispatcher
+ SSLContext sslContext = null;
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ sslContext =
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
+ }
+
+ final ChannelHandlerFactory<SplunkEvent<SocketChannel>,
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
+ return new SocketChannelDispatcher(eventFactory,
handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext,
charSet);
+ }
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final int maxBatchSize =
context.getProperty(MAX_BATCH_SIZE).asInteger();
--- End diff --
Considering that this is using the similar batching approach used in
PutKafka, curious if you tested it with run-duration > 0. See comments in
https://issues.apache.org/jira/browse/NIFI-1534 as it may be suffering from the
same issue.
> Splunk Processors
> -----------------
>
> Key: NIFI-1420
> URL: https://issues.apache.org/jira/browse/NIFI-1420
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 0.6.0
>
>
> To continue improving NiFi's ability to collect logs, a good integration
> point would be to have a processor that could listen for data from a Splunk
> forwarder (https://docs.splunk.com/Splexicon:Universalforwarder). Being able
> to push log messages to Splunk would also be useful.
> Splunk provides an SDK that may be helpful:
> https://github.com/splunk/splunk-sdk-java
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)