[
https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664275#comment-15664275
]
ASF GitHub Bot commented on NIFI-2615:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1177#discussion_r87830655
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
---
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.AlreadyConnectedException;
+import java.nio.channels.ConnectionPendingException;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.UnresolvedAddressException;
+import java.nio.channels.UnsupportedAddressTypeException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@SupportsBatching
+@SideEffectFree
+@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Connects over TCP to the provided server. When
receiving data this will writes either the" +
+ " full receive buffer or messages based on demarcator to the
content of a FlowFile. ")
+public class GetTCP extends AbstractProcessor {
+
+ private static final Validator ENDPOINT_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final
String value, final ValidationContext context) {
+ if (null == value || value.isEmpty()) {
+ return new
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
+ " cannot be empty").build();
+ }
+ //The format should be <host>:<port>{,<host>:<port>}
+ //first split on ,
+ final String[] hostPortPairs = value.split(",");
+ boolean validHostPortPairs = true;
+ String reason = "";
+ String offendingSubject = subject;
+
+ if(0 == hostPortPairs.length){
+ return new
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
+ " cannot be empty").build();
+ }
+ for (final String hostPortPair : hostPortPairs) {
+ offendingSubject = hostPortPair;
+ //split pair
+ if (hostPortPair.isEmpty()) {
+ validHostPortPairs = false;
+ reason = "endpoint is empty";
+ break;
+ }
+ if (!hostPortPair.contains(":")) {
+ validHostPortPairs = false;
+ reason = "endpoint pair does not contain valid
delimiter";
+ break;
+ }
+ final String[] parts = hostPortPair.split(":");
+
+ if (1 == parts.length) {
+ validHostPortPairs = false;
+ reason = "could not determine the port";
+ break;
+ } else {
+ try {
+ final int intVal = Integer.parseInt(parts[1]);
+
+ if (intVal <= 0) {
+ reason = "not a positive value";
+ validHostPortPairs = false;
+ break;
+ }
+ } catch (final NumberFormatException e) {
+ reason = "not a valid integer";
+ validHostPortPairs = false;
+ break;
+ }
+ }
+ //if we already have a bad pair then exit now.
+ if (!validHostPortPairs) {
+ break;
+ }
+ }
+ return new
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
+ }
+ };
+
+ public static final PropertyDescriptor ENDPOINT_LIST = new
PropertyDescriptor
+ .Builder().name("Endpoint List")
+ .description("A comma delimited list of the servers to connect
to. The format should be " +
+ "<server_address>:<port>. Only one server will be
connected to at a time, the others " +
+ "will be used as fail overs.")
+ .required(true)
+ .addValidator(ENDPOINT_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor FAILOVER_ENDPOINT = new
PropertyDescriptor
+ .Builder().name("Failover Endpoint")
+ .description("A failover server to connect to if one of the
main ones is unreachable after the connection " +
+ "attempt count. The format should be
<server_address>:<port>.")
+ .required(false)
+ // .defaultValue("")
+ .addValidator(ENDPOINT_VALIDATOR)
+ .build();
+
+
+ public static final PropertyDescriptor CONNECTION_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Connection Timeout")
+ .description("The amount of time to wait before timing out
while creating a connection")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("5 sec")
+ .build();
+
+ public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new
PropertyDescriptor.Builder()
+ .name("Connection Attempt Count")
+ .description("The number of times to try and establish a
connection, before using a backup host if available." +
+ " This same attempt count would be used for a backup
host as well.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("3")
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Number of Messages in Batch")
+ .description("The number of messages to write to the flow file
content")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+
+ public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new
PropertyDescriptor
+ .Builder().name("Receive Buffer Size")
+ .description("The size of the buffer to receive data in")
+ .required(false)
+ .defaultValue("2048")
+ .addValidator(StandardValidators.createLongValidator(1, 2048,
true))
+ .build();
+
+ public static final PropertyDescriptor KEEP_ALIVE = new
PropertyDescriptor
+ .Builder().name("Keep Alive")
+ .description("This determines if TCP keep alive is used.")
+ .required(false)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("Success")
+ .description("The relationship that all sucessful messages
from the WebSocket will be sent to")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("Failure")
+ .description("The relationship that all failed messages from
the WebSocket will be sent to")
+ .build();
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+ private final static Charset charset =
Charset.forName(StandardCharsets.UTF_8.name());
+ private final Map<String, String> dynamicAttributes = new HashMap<>();
+ private volatile Set<String> dynamicPropertyNames = new HashSet<>();
+
+ private AtomicBoolean connectedToBackup = new AtomicBoolean();
+
+
+ /*
+ * Will ensure that the list of property descriptors is build only once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(ENDPOINT_LIST);
+ _propertyDescriptors.add(FAILOVER_ENDPOINT);
+ _propertyDescriptors.add(CONNECTION_TIMEOUT);
+ _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
+ _propertyDescriptors.add(BATCH_SIZE);
+ _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
+ _propertyDescriptors.add(KEEP_ALIVE);
+
+
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
+
+ private Map<SocketRecveiverThread, Future> socketToFuture = new
HashMap<>();
+ private ExecutorService executorService;
+ private transient ComponentLog log = getLogger();
+ private transient String originalServerAddressList;
+ private transient String backupServer;
+ private transient int batchSize;
+
+ /**
+ * Bounded queue of messages events from the socket.
+ */
+ protected final BlockingQueue<String> socketMessagesReceived = new
ArrayBlockingQueue<>(256);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .required(false)
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
+
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws
ProcessException {
+
+
+ final int rcvBufferSize =
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
+ final boolean keepAlive =
context.getProperty(KEEP_ALIVE).asBoolean();
+ final int connectionTimeout =
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final int connectionRetryCount =
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
+ originalServerAddressList =
context.getProperty(ENDPOINT_LIST).getValue();
+ final String[] serverAddresses =
originalServerAddressList.split(",");
+ backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
+ executorService =
Executors.newFixedThreadPool(serverAddresses.length);
+ batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ for (final Map.Entry<PropertyDescriptor, String> entry :
context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.isDynamic()) {
+ dynamicAttributes.put(descriptor.getName(),
entry.getValue());
+
+ }
+ }
+
+ //Go through all of the servers that we need to connect to and
establish a connection to each of them
+ //this will result in all of them generating data that goes into
the flow files.
+ for (final String hostPortPair : serverAddresses) {
+ //split pair
+ final String[] hostAndPort = hostPortPair.split(":");
+
+
+ SocketRecveiverThread socketRecveiverThread = null;
+ try {
+ socketRecveiverThread =
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout,
connectionRetryCount, hostAndPort);
+ } catch (Exception ex) {
--- End diff --
This is pretty dangerous since any kind of blockage in _OnSchedule_ may
result in TimeoutException after which _OnSchedule_ will be re-invoked
essentially attempting to reconnect to things that may already being connected
to and now failing with other types of exceptions or errors.
OnSchedule is generally used to set properties and perform other localized
initialization routines. The logic of obtaining the connection should go into
onTrigger() and in the case of multi-threaded invocation synchronized. You can
look at AMQP as well as JMS processors fro an example.
> Add support for GetTCP processor
> --------------------------------
>
> Key: NIFI-2615
> URL: https://issues.apache.org/jira/browse/NIFI-2615
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.0.0, 0.7.0, 0.6.1
> Reporter: Andrew Psaltis
> Assignee: Andrew Psaltis
>
> This processor will allow NiFi to connect to a host via TCP, thus acting as
> the client and consume data. This should provide the following properties:
> * Endpoint - this should accept a list of addresses in the format of
> <Address>:<Port> - if a user wants to be able to track the ingestion rate per
> address then you would want to have one address in this list. However, there
> are times when multiple endpoints represent a logical entity and the
> aggregate ingestion rate is representative of it.
> * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is
> exhausted and a connection cannot be made to them or it is disconnected and
> cannot reconnect.
> * Receive Buffer Size -- The size of the TCP receive buffer to use. This does
> not related to the size of content in the resulting flow file.
> * Keep Alive -- This enables TCP keep Alive
> * Connection Timeout -- How long to wait when trying to establish a connection
> * Batch Size - The number of messages to put into a Flow File. This will be
> the max number of messages, as there may be cases where the number of
> messages received over the wire waiting to be emitted as FF content may be
> less then the desired batch.
> This processor should also support the following:
> 1. If a connection to end endpoint is broken, it should be logged and
> reconnections to it should be made. Potentially an exponential backoff
> strategy will be used. The strategy if there is more than one should be
> documented and potentially exposed as an Attribute.
> 2. When there are multiple instances of this processor in a flow and NiFi is
> setup in a cluster, this processor needs to ensure that received messages are
> not dual processed. For example if this processor is configured to point to
> the endpoint (172.31.32.212:10000) and the data flow is running on more than
> one node then only one node should be processing data. In essence they should
> form a group and have similar semantics as a Kafka consumer group does.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)