[ 
https://issues.apache.org/jira/browse/NIFI-2615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648897#comment-15648897
 ] 

ASF GitHub Bot commented on NIFI-2615:
--------------------------------------

Github user apsaltis commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1177#discussion_r87092120
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
 ---
    @@ -0,0 +1,668 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.net.SocketTimeoutException;
    +import java.net.StandardSocketOptions;
    +import java.nio.ByteBuffer;
    +import java.nio.CharBuffer;
    +import java.nio.channels.AlreadyConnectedException;
    +import java.nio.channels.ConnectionPendingException;
    +import java.nio.channels.SocketChannel;
    +import java.nio.channels.UnresolvedAddressException;
    +import java.nio.channels.UnsupportedAddressTypeException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.CharsetDecoder;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@SupportsBatching
    +@SideEffectFree
    +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Connects over TCP to the provided server. When 
receiving data this will writes either the" +
    +        " full receive buffer or messages based on demarcator to the 
content of a FlowFile. ")
    +public class GetTCP extends AbstractProcessor {
    +
    +    private static final Validator ENDPOINT_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
    +            if (null == value || value.isEmpty()) {
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
 + " cannot be empty").build();
    +            }
    +            //The format should be <host>:<port>{,<host>:<port>}
    +            //first split on ,
    +            final String[] hostPortPairs = value.split(",");
    +            boolean validHostPortPairs = true;
    +            String reason = "";
    +            String offendingSubject = subject;
    +
    +            if(0 == hostPortPairs.length){
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
 + " cannot be empty").build();
    +            }
    +            for (final String hostPortPair : hostPortPairs) {
    +                offendingSubject = hostPortPair;
    +                //split pair
    +                if (hostPortPair.isEmpty()) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint is empty";
    +                    break;
    +                }
    +                if (!hostPortPair.contains(":")) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint pair does not contain valid 
delimiter";
    +                    break;
    +                }
    +                final String[] parts = hostPortPair.split(":");
    +
    +                if (1 == parts.length) {
    +                    validHostPortPairs = false;
    +                    reason = "could not determine the port";
    +                    break;
    +                } else {
    +                    try {
    +                        final int intVal = Integer.parseInt(parts[1]);
    +
    +                        if (intVal <= 0) {
    +                            reason = "not a positive value";
    +                            validHostPortPairs = false;
    +                            break;
    +                        }
    +                    } catch (final NumberFormatException e) {
    +                        reason = "not a valid integer";
    +                        validHostPortPairs = false;
    +                        break;
    +                    }
    +                }
    +                //if we already have a bad pair then exit now.
    +                if (!validHostPortPairs) {
    +                    break;
    +                }
    +            }
    +            return new 
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor ENDPOINT_LIST = new 
PropertyDescriptor
    +            .Builder().name("Endpoint List")
    +            .description("A comma delimited list of the servers to connect 
to. The format should be " +
    +                    "<server_address>:<port>. Only one server will be 
connected to at a time, the others " +
    +                    "will be used as fail overs.")
    +            .required(true)
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FAILOVER_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name("Failover Endpoint")
    +            .description("A failover server to connect to if one of the 
main ones is unreachable after the connection " +
    +                    "attempt count. The format should be 
<server_address>:<port>.")
    +            .required(false)
    +            // .defaultValue("")
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout")
    +            .description("The amount of time to wait before timing out 
while creating a connection")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("5 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Attempt Count")
    +            .description("The number of times to try and establish a 
connection, before using a backup host if available." +
    +                    " This same attempt count would be used for a backup 
host as well.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("3")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Number of Messages in Batch")
    +            .description("The number of messages to write to the flow file 
content")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor
    +            .Builder().name("Receive Buffer Size")
    +            .description("The size of the buffer to receive data in")
    +            .required(false)
    +            .defaultValue("2048")
    +            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE = new 
PropertyDescriptor
    +            .Builder().name("Keep Alive")
    +            .description("This determines if TCP keep alive is used.")
    +            .required(false)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("Success")
    +            .description("The relationship that all sucessful messages 
from the WebSocket will be sent to")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("Failure")
    +            .description("The relationship that all failed messages from 
the WebSocket will be sent to")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +    private final static Charset charset = 
Charset.forName(StandardCharsets.UTF_8.name());
    +    private final Map<String, String> dynamicAttributes = new HashMap<>();
    +    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    +
    +    private AtomicBoolean connectedToBackup = new AtomicBoolean();
    +
    +
    +    /*
    +    * Will ensure that the list of property descriptors is build only once.
    +    * Will also create a Set of relationships
    +    */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ENDPOINT_LIST);
    +        _propertyDescriptors.add(FAILOVER_ENDPOINT);
    +        _propertyDescriptors.add(CONNECTION_TIMEOUT);
    +        _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
    +        _propertyDescriptors.add(KEEP_ALIVE);
    +
    +
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    private Map<SocketRecveiverThread, Future> socketToFuture = new 
HashMap<>();
    +    private ExecutorService executorService;
    +    private ComponentLog log = getLogger();
    +    private String originalServerAddressList;
    +    private String backupServer;
    +    private int batchSize;
    +
    +    /**
    +     * Bounded queue of messages events from the socket.
    +     */
    +    protected final BlockingQueue<String> socketMessagesReceived = new 
ArrayBlockingQueue<>(256);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
ProcessException {
    +
    +
    +        final int rcvBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
    +        final boolean keepAlive = 
context.getProperty(KEEP_ALIVE).asBoolean();
    +        final int connectionTimeout = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int connectionRetryCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
    +        originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
    +        final String[] serverAddresses = 
originalServerAddressList.split(",");
    +        backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
    +        executorService = 
Executors.newFixedThreadPool(serverAddresses.length);
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                dynamicAttributes.put(descriptor.getName(), 
entry.getValue());
    +
    +            }
    +        }
    +
    +        //Go through all of the servers that we need to connect to and 
establish a connection to each of them
    +        //this will result in all of them generating data that goes into 
the flow files.
    +        for (final String hostPortPair : serverAddresses) {
    +            //split pair
    +            final String[] hostAndPort = hostPortPair.split(":");
    +
    +
    +            SocketRecveiverThread socketRecveiverThread = null;
    +            try {
    +                socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, hostAndPort);
    +            } catch (Exception ex) {
    +                // Should we re-throw this or throw a ProcessException? If 
there is only one host in the list  and no backup
    +                // then we should otherwise there will be no data and we 
cannot really run. For a single hosts / port
    +                // pair it may make sense to throw as well and then 
perhaps provide a property to allow the user to decide
    +                // if there are more than one servers in the list do we 
throw or just log it. For now will throw and
    +                // treat it as an error.
    +                log = getLogger();
    +                log.error("Caught exception trying to create thread to 
process data from host: {}", new Object[]{hostAndPort[0], ex});
    +
    +                if (!connectedToBackup.get() && !backupServer.isEmpty()) {
    +                    log.error("Attempting connection to backup server: 
{}", new Object[]{backupServer});
    +                    try {
    +                        final String[] backupHostAndPort = 
backupServer.split(":");
    +                        socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, backupHostAndPort);
    +                        connectedToBackup.set(true);
    +                    } catch (Exception backupException) {
    +
    +                        log.error("Caught exception trying to connect to 
backup server: ", new Object[]{backupServer, ex});
    +                        throw new ProcessException(String.format("Caught 
exception trying to create thread to process " +
    +                                "data from backup server: %s", 
backupServer), backupException);
    +                    }
    +                } else {
    +                    final String connectionMsg = connectedToBackup.get() ? 
String.format("Already connected to backup %s", backupServer) :
    +                            "No backup server configured to connect to";
    +                    log.info(connectionMsg);
    +                    throw new ProcessException(String.format("Caught 
exception trying to create thread to process data from host: %s", 
hostAndPort[0]), ex);
    +                }
    +            }
    +
    +            getLogger().info("Created thread to process data from host: 
{}", new Object[]{hostAndPort[0]});
    +            final Future socketReceiverFuture = 
executorService.submit(socketRecveiverThread);
    +            socketToFuture.put(socketRecveiverThread, 
socketReceiverFuture);
    +
    +
    +        }
    +
    +    }
    +
    +    protected SocketRecveiverThread createSocketRecveiverThread(int 
rcvBufferSize, boolean keepAlive, int connectionTimeout, int 
connectionRetryCount, String[] hostAndPort) {
    +        SocketRecveiverThread socketRecveiverThread;
    +        socketRecveiverThread = new SocketRecveiverThread(hostAndPort[0],
    +                Integer.parseInt(hostAndPort[1]), backupServer, keepAlive, 
rcvBufferSize, connectionTimeout, connectionRetryCount);
    +        return socketRecveiverThread;
    +    }
    +
    +    @OnStopped
    +    public void tearDown() throws ProcessException {
    +        try {
    +            for (Map.Entry<SocketRecveiverThread, Future> socketAndFuture 
: socketToFuture.entrySet()) {
    +                socketAndFuture.getKey().stopProcessing();
    +                socketAndFuture.getValue().cancel(true);
    +
    +            }
    +            if (null != executorService) {
    +                executorService.shutdown();
    +                if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS)) {
    +                    executorService.shutdownNow();
    +
    +                    if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS))
    +                        log.error("Executor service for receiver thread 
did not terminate");
    +                }
    +            }
    +        } catch (final Exception e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        try {
    +            final StringBuilder messages = new StringBuilder();
    +            if (socketMessagesReceived.size() >= batchSize) {
    +                for (int i = 0; i < batchSize; i++) {
    +                    messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +                }
    +            } else {
    +                messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +            }
    +            if (0 == messages.length()) {
    +                return;
    +            }
    +            FlowFile flowFile = session.create();
    +
    +            flowFile = session.write(flowFile, out -> 
out.write(messages.toString().getBytes()));
    +            //need to at least put the list of hosts and ports, would be 
nice to know the exact
    +            //host the message came frome -- to do that we need to do one 
of two things
    +            //
    +            // 1. Have a message queue per server
    +            // 2. Add something to the message that then needs to be 
parsed.
    +            //
    +            // Both of these are not ideal, for now just show the whole 
list and someone can still use a
    +            // ROA processor and see if the attribute contains a host.
    +            flowFile = session.putAttribute(flowFile, "ServerList", 
originalServerAddressList);
    +            //add any dynamic properties
    +            if (0 < dynamicAttributes.size()) {
    +                flowFile = session.putAllAttributes(flowFile, 
dynamicAttributes);
    +            }
    +            session.transfer(flowFile, REL_SUCCESS);
    +
    +        } catch (InterruptedException exception) {
    +            throw new ProcessException(exception);
    --- End diff --
    
    Sounds good. Will change the code accordingly


> Add support for GetTCP processor
> --------------------------------
>
>                 Key: NIFI-2615
>                 URL: https://issues.apache.org/jira/browse/NIFI-2615
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.0.0, 0.7.0, 0.6.1
>            Reporter: Andrew Psaltis
>            Assignee: Andrew Psaltis
>
> This processor will allow NiFi to connect to a host via TCP, thus acting as 
> the client and consume data. This should provide the following properties:
> * Endpoint -  this should accept a list of addresses in the format of 
> <Address>:<Port> - if a user wants to be able to track the ingestion rate per 
> address then you would want to have one address in this list. However, there 
> are times when multiple endpoints represent a logical entity and the 
> aggregate ingestion rate is representative of it. 
> * Failover Endpoint - An endpoint to fall over to if the list of Endpoints is 
> exhausted and a connection cannot be made to them or it is disconnected and 
> cannot reconnect.
> * Receive Buffer Size -- The size of the TCP receive buffer to use. This does 
> not related to the size of content in the resulting flow file.
> * Keep Alive -- This enables TCP keep Alive
> * Connection Timeout -- How long to wait when trying to establish a connection
> * Batch Size - The number of messages to put into a Flow File. This will be 
> the max number of messages, as there may be cases where the number of 
> messages received over the wire waiting to be emitted as FF content may be 
> less then the desired batch.
> This processor should also support the following:
> 1. If a connection to end endpoint is broken, it should be logged and 
> reconnections to it should be made. Potentially an exponential backoff 
> strategy will be used. The strategy if there is more than one should be 
> documented and potentially exposed as an Attribute.
> 2. When there are multiple instances of this processor in a flow and NiFi is 
> setup in a cluster, this processor needs to ensure that received messages are 
> not dual processed. For example if this processor is configured to point to 
> the endpoint (172.31.32.212:10000) and the data flow is running on more than 
> one node then only one node should be processing data. In essence they should 
> form a group and have similar semantics as a Kafka consumer group does.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to