[ 
https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15633300#comment-15633300
 ] 

ASF GitHub Bot commented on NIFI-2615:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1177#discussion_r86380087
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
 ---
    @@ -0,0 +1,668 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.net.SocketTimeoutException;
    +import java.net.StandardSocketOptions;
    +import java.nio.ByteBuffer;
    +import java.nio.CharBuffer;
    +import java.nio.channels.AlreadyConnectedException;
    +import java.nio.channels.ConnectionPendingException;
    +import java.nio.channels.SocketChannel;
    +import java.nio.channels.UnresolvedAddressException;
    +import java.nio.channels.UnsupportedAddressTypeException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.CharsetDecoder;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@SupportsBatching
    +@SideEffectFree
    +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Connects over TCP to the provided server. When 
receiving data this will writes either the" +
    +        " full receive buffer or messages based on demarcator to the 
content of a FlowFile. ")
    +public class GetTCP extends AbstractProcessor {
    +
    +    private static final Validator ENDPOINT_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
    +            if (null == value || value.isEmpty()) {
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
 + " cannot be empty").build();
    +            }
    +            //The format should be <host>:<port>{,<host>:<port>}
    +            //first split on ,
    +            final String[] hostPortPairs = value.split(",");
    +            boolean validHostPortPairs = true;
    +            String reason = "";
    +            String offendingSubject = subject;
    +
    +            if(0 == hostPortPairs.length){
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
 + " cannot be empty").build();
    +            }
    +            for (final String hostPortPair : hostPortPairs) {
    +                offendingSubject = hostPortPair;
    +                //split pair
    +                if (hostPortPair.isEmpty()) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint is empty";
    +                    break;
    +                }
    +                if (!hostPortPair.contains(":")) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint pair does not contain valid 
delimiter";
    +                    break;
    +                }
    +                final String[] parts = hostPortPair.split(":");
    +
    +                if (1 == parts.length) {
    +                    validHostPortPairs = false;
    +                    reason = "could not determine the port";
    +                    break;
    +                } else {
    +                    try {
    +                        final int intVal = Integer.parseInt(parts[1]);
    +
    +                        if (intVal <= 0) {
    +                            reason = "not a positive value";
    +                            validHostPortPairs = false;
    +                            break;
    +                        }
    +                    } catch (final NumberFormatException e) {
    +                        reason = "not a valid integer";
    +                        validHostPortPairs = false;
    +                        break;
    +                    }
    +                }
    +                //if we already have a bad pair then exit now.
    +                if (!validHostPortPairs) {
    +                    break;
    +                }
    +            }
    +            return new 
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor ENDPOINT_LIST = new 
PropertyDescriptor
    +            .Builder().name("Endpoint List")
    +            .description("A comma delimited list of the servers to connect 
to. The format should be " +
    +                    "<server_address>:<port>. Only one server will be 
connected to at a time, the others " +
    +                    "will be used as fail overs.")
    +            .required(true)
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FAILOVER_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name("Failover Endpoint")
    +            .description("A failover server to connect to if one of the 
main ones is unreachable after the connection " +
    +                    "attempt count. The format should be 
<server_address>:<port>.")
    +            .required(false)
    +            // .defaultValue("")
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout")
    +            .description("The amount of time to wait before timing out 
while creating a connection")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("5 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Attempt Count")
    +            .description("The number of times to try and establish a 
connection, before using a backup host if available." +
    +                    " This same attempt count would be used for a backup 
host as well.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("3")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Number of Messages in Batch")
    +            .description("The number of messages to write to the flow file 
content")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor
    +            .Builder().name("Receive Buffer Size")
    +            .description("The size of the buffer to receive data in")
    +            .required(false)
    +            .defaultValue("2048")
    +            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE = new 
PropertyDescriptor
    +            .Builder().name("Keep Alive")
    +            .description("This determines if TCP keep alive is used.")
    +            .required(false)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("Success")
    +            .description("The relationship that all sucessful messages 
from the WebSocket will be sent to")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("Failure")
    +            .description("The relationship that all failed messages from 
the WebSocket will be sent to")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +    private final static Charset charset = 
Charset.forName(StandardCharsets.UTF_8.name());
    +    private final Map<String, String> dynamicAttributes = new HashMap<>();
    +    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    +
    +    private AtomicBoolean connectedToBackup = new AtomicBoolean();
    +
    +
    +    /*
    +    * Will ensure that the list of property descriptors is build only once.
    +    * Will also create a Set of relationships
    +    */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ENDPOINT_LIST);
    +        _propertyDescriptors.add(FAILOVER_ENDPOINT);
    +        _propertyDescriptors.add(CONNECTION_TIMEOUT);
    +        _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
    +        _propertyDescriptors.add(KEEP_ALIVE);
    +
    +
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    private Map<SocketRecveiverThread, Future> socketToFuture = new 
HashMap<>();
    +    private ExecutorService executorService;
    +    private ComponentLog log = getLogger();
    +    private String originalServerAddressList;
    +    private String backupServer;
    +    private int batchSize;
    +
    +    /**
    +     * Bounded queue of messages events from the socket.
    +     */
    +    protected final BlockingQueue<String> socketMessagesReceived = new 
ArrayBlockingQueue<>(256);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
ProcessException {
    +
    +
    +        final int rcvBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
    +        final boolean keepAlive = 
context.getProperty(KEEP_ALIVE).asBoolean();
    +        final int connectionTimeout = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int connectionRetryCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
    +        originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
    +        final String[] serverAddresses = 
originalServerAddressList.split(",");
    +        backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
    +        executorService = 
Executors.newFixedThreadPool(serverAddresses.length);
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                dynamicAttributes.put(descriptor.getName(), 
entry.getValue());
    +
    +            }
    +        }
    +
    +        //Go through all of the servers that we need to connect to and 
establish a connection to each of them
    +        //this will result in all of them generating data that goes into 
the flow files.
    +        for (final String hostPortPair : serverAddresses) {
    +            //split pair
    +            final String[] hostAndPort = hostPortPair.split(":");
    +
    +
    +            SocketRecveiverThread socketRecveiverThread = null;
    +            try {
    +                socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, hostAndPort);
    +            } catch (Exception ex) {
    +                // Should we re-throw this or throw a ProcessException? If 
there is only one host in the list  and no backup
    +                // then we should otherwise there will be no data and we 
cannot really run. For a single hosts / port
    +                // pair it may make sense to throw as well and then 
perhaps provide a property to allow the user to decide
    +                // if there are more than one servers in the list do we 
throw or just log it. For now will throw and
    +                // treat it as an error.
    +                log = getLogger();
    +                log.error("Caught exception trying to create thread to 
process data from host: {}", new Object[]{hostAndPort[0], ex});
    +
    +                if (!connectedToBackup.get() && !backupServer.isEmpty()) {
    +                    log.error("Attempting connection to backup server: 
{}", new Object[]{backupServer});
    +                    try {
    +                        final String[] backupHostAndPort = 
backupServer.split(":");
    +                        socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, backupHostAndPort);
    +                        connectedToBackup.set(true);
    +                    } catch (Exception backupException) {
    +
    +                        log.error("Caught exception trying to connect to 
backup server: ", new Object[]{backupServer, ex});
    +                        throw new ProcessException(String.format("Caught 
exception trying to create thread to process " +
    +                                "data from backup server: %s", 
backupServer), backupException);
    +                    }
    +                } else {
    +                    final String connectionMsg = connectedToBackup.get() ? 
String.format("Already connected to backup %s", backupServer) :
    +                            "No backup server configured to connect to";
    +                    log.info(connectionMsg);
    +                    throw new ProcessException(String.format("Caught 
exception trying to create thread to process data from host: %s", 
hostAndPort[0]), ex);
    +                }
    +            }
    +
    +            getLogger().info("Created thread to process data from host: 
{}", new Object[]{hostAndPort[0]});
    +            final Future socketReceiverFuture = 
executorService.submit(socketRecveiverThread);
    +            socketToFuture.put(socketRecveiverThread, 
socketReceiverFuture);
    +
    +
    +        }
    +
    +    }
    +
    +    protected SocketRecveiverThread createSocketRecveiverThread(int 
rcvBufferSize, boolean keepAlive, int connectionTimeout, int 
connectionRetryCount, String[] hostAndPort) {
    +        SocketRecveiverThread socketRecveiverThread;
    +        socketRecveiverThread = new SocketRecveiverThread(hostAndPort[0],
    +                Integer.parseInt(hostAndPort[1]), backupServer, keepAlive, 
rcvBufferSize, connectionTimeout, connectionRetryCount);
    +        return socketRecveiverThread;
    +    }
    +
    +    @OnStopped
    +    public void tearDown() throws ProcessException {
    +        try {
    +            for (Map.Entry<SocketRecveiverThread, Future> socketAndFuture 
: socketToFuture.entrySet()) {
    +                socketAndFuture.getKey().stopProcessing();
    +                socketAndFuture.getValue().cancel(true);
    +
    +            }
    +            if (null != executorService) {
    +                executorService.shutdown();
    +                if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS)) {
    +                    executorService.shutdownNow();
    +
    +                    if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS))
    +                        log.error("Executor service for receiver thread 
did not terminate");
    +                }
    +            }
    +        } catch (final Exception e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        try {
    +            final StringBuilder messages = new StringBuilder();
    +            if (socketMessagesReceived.size() >= batchSize) {
    +                for (int i = 0; i < batchSize; i++) {
    +                    messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +                }
    +            } else {
    +                messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +            }
    +            if (0 == messages.length()) {
    +                return;
    +            }
    +            FlowFile flowFile = session.create();
    +
    +            flowFile = session.write(flowFile, out -> 
out.write(messages.toString().getBytes()));
    +            //need to at least put the list of hosts and ports, would be 
nice to know the exact
    +            //host the message came frome -- to do that we need to do one 
of two things
    +            //
    +            // 1. Have a message queue per server
    +            // 2. Add something to the message that then needs to be 
parsed.
    +            //
    +            // Both of these are not ideal, for now just show the whole 
list and someone can still use a
    +            // ROA processor and see if the attribute contains a host.
    +            flowFile = session.putAttribute(flowFile, "ServerList", 
originalServerAddressList);
    +            //add any dynamic properties
    +            if (0 < dynamicAttributes.size()) {
    +                flowFile = session.putAllAttributes(flowFile, 
dynamicAttributes);
    +            }
    +            session.transfer(flowFile, REL_SUCCESS);
    +
    +        } catch (InterruptedException exception) {
    +            throw new ProcessException(exception);
    --- End diff --
    
    The InterruptedException is usually the result if abnormal shutdown and not 
an application error. All we should to do here  is log (optional) and reset the 
interrupt flag for the current thread so other component in the call stack can 
react:
    ```
    } catch (InterruptedException exception) {
                this.getLogger().warn("Current thread is interrupted");
                Thread.currentThread().interrupt();
    }
    ```


> Add support for GetTCP processor
> --------------------------------
>
>                 Key: NIFI-2615
>                 URL: https://issues.apache.org/jira/browse/NIFI-2615
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.0.0, 0.7.0, 0.6.1
>            Reporter: Andrew Psaltis
>            Assignee: Andrew Psaltis
>
> This processor will allow NiFi to connect to a host via TCP, thus acting as 
> the client and consume data. This should provide the following properties:
> * Endpoint -  this should accept a list of addresses in the format of 
> <Address>:<Port> - if a user wants to be able to track the ingestion rate per 
> address then you would want to have one address in this list. However, there 
> are times when multiple endpoints represent a logical entity and the 
> aggregate ingestion rate is representative of it. 
> * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is 
> exhausted and a connection cannot be made to them or it is disconnected and 
> cannot reconnect.
> * Receive Buffer Size -- The size of the TCP receive buffer to use. This does 
> not related to the size of content in the resulting flow file.
> * Keep Alive -- This enables TCP keep Alive
> * Connection Timeout -- How long to wait when trying to establish a connection
> * Batch Size - The number of messages to put into a Flow File. This will be 
> the max number of messages, as there may be cases where the number of 
> messages received over the wire waiting to be emitted as FF content may be 
> less then the desired batch.
> This processor should also support the following:
> 1. If a connection to end endpoint is broken, it should be logged and 
> reconnections to it should be made. Potentially an exponential backoff 
> strategy will be used. The strategy if there is more than one should be 
> documented and potentially exposed as an Attribute.
> 2. When there are multiple instances of this processor in a flow and NiFi is 
> setup in a cluster, this processor needs to ensure that received messages are 
> not dual processed. For example if this processor is configured to point to 
> the endpoint (172.31.32.212:10000) and the data flow is running on more than 
> one node then only one node should be processing data. In essence they should 
> form a group and have similar semantics as a Kafka consumer group does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to