[ 
https://issues.apache.org/jira/browse/NIFI-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094493#comment-16094493
 ] 

ASF GitHub Bot commented on NIFI-4152:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1987#discussion_r128478338
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
 ---
    @@ -0,0 +1,432 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.listen.ListenerProperties;
    +import org.apache.nifi.record.listen.SocketChannelRecordReader;
    +import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher;
    +import org.apache.nifi.security.util.SslContextFactory;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordReaderFactory;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.net.NetworkInterface;
    +import java.nio.channels.ServerSocketChannel;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
    +
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "tcp", "record", "tls", "ssl"})
    +@CapabilityDescription("Listens for incoming TCP connections and reads 
data from each connection using a configured record " +
    +        "reader, and writes the records to a flow file using a configured 
record writer. The type of record reader selected will " +
    +        "determine how clients are expected to send data. For example, 
when using a Grok reader to read logs, a client can keep an " +
    +        "open connection and continuously stream data, but when using an 
JSON reader, the client cannot send an array of JSON " +
    +        "documents and then send another array on the same connection, as 
the reader would be in a bad state at that point. Records " +
    +        "will be read from the connection in blocking mode, and will 
timeout according to the Read Timeout specified in the processor. " +
    +        "If the read times out, or if any other error is encountered when 
reading, the connection will be closed, and any records " +
    +        "read up to that point will be handled according to the configured 
Read Error Strategy (Discard or Transfer). In cases where " +
    +        "clients are keeping a connection open, the concurrent tasks for 
the processor should be adjusted to match the Max Number of " +
    +        "TCP Connections allowed, so that there is a task processing each 
connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="tcp.sender", description="The host 
that sent the data."),
    +        @WritesAttribute(attribute="tcp.port", description="The port that 
the processor accepted the connection on."),
    +        @WritesAttribute(attribute="record.count", description="The number 
of records written to the flow file."),
    +        @WritesAttribute(attribute="mime.type", description="The mime-type 
of the writer used to write the records to the flow file.")
    +})
    +public class ListenTCPRecord extends AbstractProcessor {
    +
    +    static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port to listen on for communication.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Read Timeout")
    +            .description("The amount of time to wait before timing out 
when reading from a connection.")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("30 seconds")
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Buffer")
    +            .description("The maximum size of the socket buffer that 
should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If 
this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be 
dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +
    +    static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
    +            .name("Max Number of TCP Connections")
    +            .description("The maximum number of concurrent TCP connections 
to accept.")
    --- End diff --
    
    Should we add in the property description the comment you added in the 
capability description?
    "In cases where clients are keeping a connection open, the concurrent tasks 
for the processor should be adjusted to match the Max Number of TCP Connections 
allowed, so that there is a task processing each connection."


> Create ListenTCPRecord Processor
> --------------------------------
>
>                 Key: NIFI-4152
>                 URL: https://issues.apache.org/jira/browse/NIFI-4152
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>         Attachments: ListenTCPRecordWithGrok.xml
>
>
> We should implement a ListenTCPRecord that can pass the underlying 
> InputStream from a TCP connection to a record reader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to