[ 
https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664313#comment-15664313
 ] 

ASF GitHub Bot commented on NIFI-2615:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1177#discussion_r87834336
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
 ---
    @@ -0,0 +1,668 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.net.SocketTimeoutException;
    +import java.net.StandardSocketOptions;
    +import java.nio.ByteBuffer;
    +import java.nio.CharBuffer;
    +import java.nio.channels.AlreadyConnectedException;
    +import java.nio.channels.ConnectionPendingException;
    +import java.nio.channels.SocketChannel;
    +import java.nio.channels.UnresolvedAddressException;
    +import java.nio.channels.UnsupportedAddressTypeException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.CharsetDecoder;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@SupportsBatching
    +@SideEffectFree
    +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Connects over TCP to the provided server. When 
receiving data this will writes either the" +
    +        " full receive buffer or messages based on demarcator to the 
content of a FlowFile. ")
    +public class GetTCP extends AbstractProcessor {
    +
    +    private static final Validator ENDPOINT_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
    +            if (null == value || value.isEmpty()) {
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
 + " cannot be empty").build();
    +            }
    +            //The format should be <host>:<port>{,<host>:<port>}
    +            //first split on ,
    +            final String[] hostPortPairs = value.split(",");
    +            boolean validHostPortPairs = true;
    +            String reason = "";
    +            String offendingSubject = subject;
    +
    +            if(0 == hostPortPairs.length){
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
 + " cannot be empty").build();
    +            }
    +            for (final String hostPortPair : hostPortPairs) {
    +                offendingSubject = hostPortPair;
    +                //split pair
    +                if (hostPortPair.isEmpty()) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint is empty";
    +                    break;
    +                }
    +                if (!hostPortPair.contains(":")) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint pair does not contain valid 
delimiter";
    +                    break;
    +                }
    +                final String[] parts = hostPortPair.split(":");
    +
    +                if (1 == parts.length) {
    +                    validHostPortPairs = false;
    +                    reason = "could not determine the port";
    +                    break;
    +                } else {
    +                    try {
    +                        final int intVal = Integer.parseInt(parts[1]);
    +
    +                        if (intVal <= 0) {
    +                            reason = "not a positive value";
    +                            validHostPortPairs = false;
    +                            break;
    +                        }
    +                    } catch (final NumberFormatException e) {
    +                        reason = "not a valid integer";
    +                        validHostPortPairs = false;
    +                        break;
    +                    }
    +                }
    +                //if we already have a bad pair then exit now.
    +                if (!validHostPortPairs) {
    +                    break;
    +                }
    +            }
    +            return new 
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor ENDPOINT_LIST = new 
PropertyDescriptor
    +            .Builder().name("Endpoint List")
    +            .description("A comma delimited list of the servers to connect 
to. The format should be " +
    +                    "<server_address>:<port>. Only one server will be 
connected to at a time, the others " +
    +                    "will be used as fail overs.")
    +            .required(true)
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FAILOVER_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name("Failover Endpoint")
    +            .description("A failover server to connect to if one of the 
main ones is unreachable after the connection " +
    +                    "attempt count. The format should be 
<server_address>:<port>.")
    +            .required(false)
    +            // .defaultValue("")
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout")
    +            .description("The amount of time to wait before timing out 
while creating a connection")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("5 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Attempt Count")
    +            .description("The number of times to try and establish a 
connection, before using a backup host if available." +
    +                    " This same attempt count would be used for a backup 
host as well.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("3")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Number of Messages in Batch")
    +            .description("The number of messages to write to the flow file 
content")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor
    +            .Builder().name("Receive Buffer Size")
    +            .description("The size of the buffer to receive data in")
    +            .required(false)
    +            .defaultValue("2048")
    +            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE = new 
PropertyDescriptor
    +            .Builder().name("Keep Alive")
    +            .description("This determines if TCP keep alive is used.")
    +            .required(false)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("Success")
    +            .description("The relationship that all sucessful messages 
from the WebSocket will be sent to")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("Failure")
    +            .description("The relationship that all failed messages from 
the WebSocket will be sent to")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +    private final static Charset charset = 
Charset.forName(StandardCharsets.UTF_8.name());
    +    private final Map<String, String> dynamicAttributes = new HashMap<>();
    +    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    +
    +    private AtomicBoolean connectedToBackup = new AtomicBoolean();
    +
    +
    +    /*
    +    * Will ensure that the list of property descriptors is build only once.
    +    * Will also create a Set of relationships
    +    */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ENDPOINT_LIST);
    +        _propertyDescriptors.add(FAILOVER_ENDPOINT);
    +        _propertyDescriptors.add(CONNECTION_TIMEOUT);
    +        _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
    +        _propertyDescriptors.add(KEEP_ALIVE);
    +
    +
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    private Map<SocketRecveiverThread, Future> socketToFuture = new 
HashMap<>();
    +    private ExecutorService executorService;
    +    private transient ComponentLog log = getLogger();
    +    private transient String originalServerAddressList;
    +    private transient String backupServer;
    +    private transient int batchSize;
    +
    +    /**
    +     * Bounded queue of messages events from the socket.
    +     */
    +    protected final BlockingQueue<String> socketMessagesReceived = new 
ArrayBlockingQueue<>(256);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
ProcessException {
    +
    +
    +        final int rcvBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
    +        final boolean keepAlive = 
context.getProperty(KEEP_ALIVE).asBoolean();
    +        final int connectionTimeout = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int connectionRetryCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
    +        originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
    +        final String[] serverAddresses = 
originalServerAddressList.split(",");
    +        backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
    +        executorService = 
Executors.newFixedThreadPool(serverAddresses.length);
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                dynamicAttributes.put(descriptor.getName(), 
entry.getValue());
    +
    +            }
    +        }
    +
    +        //Go through all of the servers that we need to connect to and 
establish a connection to each of them
    +        //this will result in all of them generating data that goes into 
the flow files.
    +        for (final String hostPortPair : serverAddresses) {
    +            //split pair
    +            final String[] hostAndPort = hostPortPair.split(":");
    +
    +
    +            SocketRecveiverThread socketRecveiverThread = null;
    +            try {
    +                socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, hostAndPort);
    +            } catch (Exception ex) {
    --- End diff --
    
    Also, it grows through the list synchronously, so it may block for a long 
time when it can't connect to a particular server while it is connected to 
others and ready to receive messages. 
    Personally I need to better understand the requirement on this. I do see a 
value of receiving from multiple servers, but I could easily do it with 
multiple GetTCP processors and have much better control over not only the 
connectivity and it's failure, but also security (e.g., SSL) and other protocol 
specific properties.


> Add support for GetTCP processor
> --------------------------------
>
>                 Key: NIFI-2615
>                 URL: https://issues.apache.org/jira/browse/NIFI-2615
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.0.0, 0.7.0, 0.6.1
>            Reporter: Andrew Psaltis
>            Assignee: Andrew Psaltis
>
> This processor will allow NiFi to connect to a host via TCP, thus acting as 
> the client and consume data. This should provide the following properties:
> * Endpoint -  this should accept a list of addresses in the format of 
> <Address>:<Port> - if a user wants to be able to track the ingestion rate per 
> address then you would want to have one address in this list. However, there 
> are times when multiple endpoints represent a logical entity and the 
> aggregate ingestion rate is representative of it. 
> * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is 
> exhausted and a connection cannot be made to them or it is disconnected and 
> cannot reconnect.
> * Receive Buffer Size -- The size of the TCP receive buffer to use. This does 
> not related to the size of content in the resulting flow file.
> * Keep Alive -- This enables TCP keep Alive
> * Connection Timeout -- How long to wait when trying to establish a connection
> * Batch Size - The number of messages to put into a Flow File. This will be 
> the max number of messages, as there may be cases where the number of 
> messages received over the wire waiting to be emitted as FF content may be 
> less then the desired batch.
> This processor should also support the following:
> 1. If a connection to end endpoint is broken, it should be logged and 
> reconnections to it should be made. Potentially an exponential backoff 
> strategy will be used. The strategy if there is more than one should be 
> documented and potentially exposed as an Attribute.
> 2. When there are multiple instances of this processor in a flow and NiFi is 
> setup in a cluster, this processor needs to ensure that received messages are 
> not dual processed. For example if this processor is configured to point to 
> the endpoint (172.31.32.212:10000) and the data flow is running on more than 
> one node then only one node should be processing data. In essence they should 
> form a group and have similar semantics as a Kafka consumer group does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to