[
https://issues.apache.org/jira/browse/NIFI-4152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094493#comment-16094493
]
ASF GitHub Bot commented on NIFI-4152:
--------------------------------------
Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1987#discussion_r128478338
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java
---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.record.listen.SocketChannelRecordReader;
+import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.nio.channels.ServerSocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"listen", "tcp", "record", "tls", "ssl"})
+@CapabilityDescription("Listens for incoming TCP connections and reads
data from each connection using a configured record " +
+ "reader, and writes the records to a flow file using a configured
record writer. The type of record reader selected will " +
+ "determine how clients are expected to send data. For example,
when using a Grok reader to read logs, a client can keep an " +
+ "open connection and continuously stream data, but when using an
JSON reader, the client cannot send an array of JSON " +
+ "documents and then send another array on the same connection, as
the reader would be in a bad state at that point. Records " +
+ "will be read from the connection in blocking mode, and will
timeout according to the Read Timeout specified in the processor. " +
+ "If the read times out, or if any other error is encountered when
reading, the connection will be closed, and any records " +
+ "read up to that point will be handled according to the configured
Read Error Strategy (Discard or Transfer). In cases where " +
+ "clients are keeping a connection open, the concurrent tasks for
the processor should be adjusted to match the Max Number of " +
+ "TCP Connections allowed, so that there is a task processing each
connection.")
+@WritesAttributes({
+ @WritesAttribute(attribute="tcp.sender", description="The host
that sent the data."),
+ @WritesAttribute(attribute="tcp.port", description="The port that
the processor accepted the connection on."),
+ @WritesAttribute(attribute="record.count", description="The number
of records written to the flow file."),
+ @WritesAttribute(attribute="mime.type", description="The mime-type
of the writer used to write the records to the flow file.")
+})
+public class ListenTCPRecord extends AbstractProcessor {
+
+ static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port to listen on for communication.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor READ_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Read Timeout")
+ .description("The amount of time to wait before timing out
when reading from a connection.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 seconds")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Size of Socket Buffer")
+ .description("The maximum size of the socket buffer that
should be used. This is a suggestion to the Operating System " +
+ "to indicate how big the socket buffer should be. If
this value is set too low, the buffer may fill up before " +
+ "the data can be read, and incoming data will be
dropped.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor MAX_CONNECTIONS = new
PropertyDescriptor.Builder()
+ .name("Max Number of TCP Connections")
+ .description("The maximum number of concurrent TCP connections
to accept.")
--- End diff --
Should we add in the property description the comment you added in the
capability description?
"In cases where clients are keeping a connection open, the concurrent tasks
for the processor should be adjusted to match the Max Number of TCP Connections
allowed, so that there is a task processing each connection."
> Create ListenTCPRecord Processor
> --------------------------------
>
> Key: NIFI-4152
> URL: https://issues.apache.org/jira/browse/NIFI-4152
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Attachments: ListenTCPRecordWithGrok.xml
>
>
> We should implement a ListenTCPRecord that can pass the underlying
> InputStream from a TCP connection to a record reader.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)