[ 
https://issues.apache.org/jira/browse/NIFI-856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15261643#comment-15261643
 ] 

ASF GitHub Bot commented on NIFI-856:
-------------------------------------

Github user trixpan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/290#discussion_r61381388
  
    --- Diff: 
nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
 ---
    @@ -0,0 +1,211 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.lumberjack;
    +
    +import com.google.gson.Gson;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.EventFactory;
    +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponder;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponse;
    +import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
    +import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
    +import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
    +import 
org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory;
    +import 
org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse;
    +import org.apache.nifi.processors.lumberjack.response.LumberjackResponse;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "lumberjack", "tcp", "logs"})
    +@CapabilityDescription("Listens for Lumberjack messages being sent to a 
given port over TCP. Each message will be " +
    +        "acknowledged after successfully writing the message to a 
FlowFile. Each FlowFile will contain data " +
    +        "portion of one or more Lumberjack frames. In the case where the 
Lumberjack frames contain syslog messages, the " +
    +        "output of this processor can be sent to a ParseSyslog processor 
for further processing.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="lumberjack.sender", description="The 
sending host of the messages."),
    +        @WritesAttribute(attribute="lumberjack.port", description="The 
sending port the messages were received over."),
    +        @WritesAttribute(attribute="lumberjack.sequencenumber", 
description="The sequence number of the message. Only included if <Batch Size> 
is 1."),
    +        @WritesAttribute(attribute="lumberjack.*", description="The keys 
and respective values as sent by the lumberjack producer. Only included if 
<Batch Size> is 1."),
    +        @WritesAttribute(attribute="mime.type", description="The mime.type 
of the content which is text/plain")
    +})
    +@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
    +public class ListenLumberjack extends 
AbstractListenEventBatchingProcessor<LumberjackEvent> {
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain 
an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    private volatile LumberjackEncoder lumberjackEncoder;
    +    private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
    +
    +
    +
    +    @Override
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE);
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) throws IOException {
    +        super.onScheduled(context);
    +        // wanted to ensure charset was already populated here
    +        lumberjackEncoder = new LumberjackEncoder();
    +    }
    +
    +    @Override
    +    protected ChannelDispatcher createDispatcher(final ProcessContext 
context, final BlockingQueue<LumberjackEvent> events) throws IOException {
    +        final EventFactory<LumberjackEvent> eventFactory = new 
LumberjackEventFactory();
    +        final 
ChannelHandlerFactory<LumberjackEvent,AsyncChannelDispatcher> handlerFactory = 
new LumberjackSocketChannelHandlerFactory<>();
    +
    +        final int maxConnections = 
context.getProperty(MAX_CONNECTIONS).asInteger();
    +        final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    +        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        // initialize the buffer pool based on max number of connections 
and the buffer size
    +        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(maxConnections, bufferSize);
    +
    +        // if an SSLContextService was provided then create an SSLContext 
to pass down to the dispatcher
    +        SSLContext sslContext = null;
    +        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +        if (sslContextService != null) {
    +            sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
    +        }
    +
    +        // if we decide to support SSL then get the context and pass it in 
here
    +        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
bufferPool, events,
    --- End diff --
    
    The logstash forwarded is quite picky around TLS auth. So I tried to remain 
truthful to the original behavior (as it is well documented with a number of 
howtos).
    
    Happy to revisit it once I add support to Lumberjack v2 as libbeat has TLS 
as optional.


> Add Processor for Lumberjack protocol
> -------------------------------------
>
>                 Key: NIFI-856
>                 URL: https://issues.apache.org/jira/browse/NIFI-856
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Mike de Rhino 
>              Labels: features
>         Attachments: NIFI-856.patch
>
>
> It would be great if NIFI could support the [lumberjack 
> protocol|https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md]
>  so to enable the use of logstash forwarder as a source of data.
> A lot of non Java shops tend to avoid installing Java at data producing nodes 
> and instead of Flume they end up using things like kafka, heka, fluentd or 
> logstash-forwarded as data shipping mechanisms. 
> Kafka is great but its architecture seem to be better focused on multi-DC 
> environments instead of multi-branch scenarios (imagine having to manager 80 
> Zookeeper quorum, one for each country where you operate?)
> [Heka|https://github.com/mozilla-services/heka] is fine, it has decent 
> backpressure buffering but no concept of acknowledgement on the receiving 
> side of a TCP stream. If the other end of a TCP stream is capable of 
> listening but gets stuck with its messages it will keep spitting data through 
> the pipe, oblivious to the woes at the other end.
> Logstash forwarder in the other hand, is a quite simple tool, with a 
> reasonable implementation of acknowledgments on the receiving side but... it 
> depends on Logstash(and logstash has its own issues).
> It would be great if NIFI could serve as a middle man, receiving lumberjack 
> messages and offloading some of the hard work Logstash seems to struggle with 
> (e.g. using NIFI to save to HDFS while a downstream Logstash writes into ES).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to