[ 
https://issues.apache.org/jira/browse/NIFI-856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15307149#comment-15307149
 ] 

ASF GitHub Bot commented on NIFI-856:
-------------------------------------

Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/290#discussion_r65119862
  
    --- Diff: 
nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
 ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.lumberjack;
    +
    +import com.google.gson.Gson;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.EventFactory;
    +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponder;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponse;
    +import org.apache.nifi.processors.lumberjack.event.LumberjackEvent;
    +import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory;
    +import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder;
    +import 
org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory;
    +import 
org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse;
    +import org.apache.nifi.processors.lumberjack.response.LumberjackResponse;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Collection;
    +import java.util.ArrayList;
    +import java.util.concurrent.BlockingQueue;
    +
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "lumberjack", "tcp", "logs"})
    +@CapabilityDescription("Listens for Lumberjack messages being sent to a 
given port over TCP. Each message will be " +
    +        "acknowledged after successfully writing the message to a 
FlowFile. Each FlowFile will contain data " +
    +        "portion of one or more Lumberjack frames. In the case where the 
Lumberjack frames contain syslog messages, the " +
    +        "output of this processor can be sent to a ParseSyslog processor 
for further processing.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="lumberjack.sender", description="The 
sending host of the messages."),
    +        @WritesAttribute(attribute="lumberjack.port", description="The 
sending port the messages were received over."),
    +        @WritesAttribute(attribute="lumberjack.sequencenumber", 
description="The sequence number of the message. Only included if <Batch Size> 
is 1."),
    +        @WritesAttribute(attribute="lumberjack.*", description="The keys 
and respective values as sent by the lumberjack producer. Only included if 
<Batch Size> is 1."),
    +        @WritesAttribute(attribute="mime.type", description="The mime.type 
of the content which is text/plain")
    +})
    +@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
    +public class ListenLumberjack extends 
AbstractListenEventBatchingProcessor<LumberjackEvent> {
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain 
an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection. 
Note that as Lumberjack client requires"+
    +                    "two-way SSL authentication, the controller MUST have 
a truststore and a keystore to work"+
    +                    "properly.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Arrays.asList(
    +                MAX_CONNECTIONS,
    +                SSL_CONTEXT_SERVICE
    +        );
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +
    +
    +
    +        if (sslContextService != null && 
sslContextService.isTrustStoreConfigured() == false) {
    +            results.add(new ValidationResult.Builder()
    +                    .explanation("The context service must have a 
truststore  configured for the lumberjack forwarder client to work correctly")
    +                    
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
    +        }
    +
    +        return results;
    +    }
    +
    +    private volatile LumberjackEncoder lumberjackEncoder;
    +    private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
    --- End diff --
    
    I believe this is unused in favor of the delimiter from the superclass.  Is 
that correct?


> Add Processor for Lumberjack protocol
> -------------------------------------
>
>                 Key: NIFI-856
>                 URL: https://issues.apache.org/jira/browse/NIFI-856
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Mike de Rhino 
>              Labels: features
>         Attachments: NIFI-856.patch
>
>
> It would be great if NIFI could support the [lumberjack 
> protocol|https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md]
>  so to enable the use of logstash forwarder as a source of data.
> A lot of non Java shops tend to avoid installing Java at data producing nodes 
> and instead of Flume they end up using things like kafka, heka, fluentd or 
> logstash-forwarded as data shipping mechanisms. 
> Kafka is great but its architecture seem to be better focused on multi-DC 
> environments instead of multi-branch scenarios (imagine having to manager 80 
> Zookeeper quorum, one for each country where you operate?)
> [Heka|https://github.com/mozilla-services/heka] is fine, it has decent 
> backpressure buffering but no concept of acknowledgement on the receiving 
> side of a TCP stream. If the other end of a TCP stream is capable of 
> listening but gets stuck with its messages it will keep spitting data through 
> the pipe, oblivious to the woes at the other end.
> Logstash forwarder in the other hand, is a quite simple tool, with a 
> reasonable implementation of acknowledgments on the receiving side but... it 
> depends on Logstash(and logstash has its own issues).
> It would be great if NIFI could serve as a middle man, receiving lumberjack 
> messages and offloading some of the hard work Logstash seems to struggle with 
> (e.g. using NIFI to save to HDFS while a downstream Logstash writes into ES).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to