Github user apsaltis commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1177#discussion_r87092120
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetTCP.java
 ---
    @@ -0,0 +1,668 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.Socket;
    +import java.net.SocketAddress;
    +import java.net.SocketTimeoutException;
    +import java.net.StandardSocketOptions;
    +import java.nio.ByteBuffer;
    +import java.nio.CharBuffer;
    +import java.nio.channels.AlreadyConnectedException;
    +import java.nio.channels.ConnectionPendingException;
    +import java.nio.channels.SocketChannel;
    +import java.nio.channels.UnresolvedAddressException;
    +import java.nio.channels.UnsupportedAddressTypeException;
    +import java.nio.charset.Charset;
    +import java.nio.charset.CharsetDecoder;
    +import java.nio.charset.StandardCharsets;
    +import java.time.Instant;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +@SupportsBatching
    +@SideEffectFree
    +@Tags({"get", "fetch", "poll", "tcp", "ingest", "source", "input"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@CapabilityDescription("Connects over TCP to the provided server. When 
receiving data this will writes either the" +
    +        " full receive buffer or messages based on demarcator to the 
content of a FlowFile. ")
    +public class GetTCP extends AbstractProcessor {
    +
    +    private static final Validator ENDPOINT_VALIDATOR = new Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
    +            if (null == value || value.isEmpty()) {
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(subject
 + " cannot be empty").build();
    +            }
    +            //The format should be <host>:<port>{,<host>:<port>}
    +            //first split on ,
    +            final String[] hostPortPairs = value.split(",");
    +            boolean validHostPortPairs = true;
    +            String reason = "";
    +            String offendingSubject = subject;
    +
    +            if(0 == hostPortPairs.length){
    +                return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation(offendingSubject
 + " cannot be empty").build();
    +            }
    +            for (final String hostPortPair : hostPortPairs) {
    +                offendingSubject = hostPortPair;
    +                //split pair
    +                if (hostPortPair.isEmpty()) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint is empty";
    +                    break;
    +                }
    +                if (!hostPortPair.contains(":")) {
    +                    validHostPortPairs = false;
    +                    reason = "endpoint pair does not contain valid 
delimiter";
    +                    break;
    +                }
    +                final String[] parts = hostPortPair.split(":");
    +
    +                if (1 == parts.length) {
    +                    validHostPortPairs = false;
    +                    reason = "could not determine the port";
    +                    break;
    +                } else {
    +                    try {
    +                        final int intVal = Integer.parseInt(parts[1]);
    +
    +                        if (intVal <= 0) {
    +                            reason = "not a positive value";
    +                            validHostPortPairs = false;
    +                            break;
    +                        }
    +                    } catch (final NumberFormatException e) {
    +                        reason = "not a valid integer";
    +                        validHostPortPairs = false;
    +                        break;
    +                    }
    +                }
    +                //if we already have a bad pair then exit now.
    +                if (!validHostPortPairs) {
    +                    break;
    +                }
    +            }
    +            return new 
ValidationResult.Builder().subject(offendingSubject).input(value).explanation(reason).valid(validHostPortPairs).build();
    +        }
    +    };
    +
    +    public static final PropertyDescriptor ENDPOINT_LIST = new 
PropertyDescriptor
    +            .Builder().name("Endpoint List")
    +            .description("A comma delimited list of the servers to connect 
to. The format should be " +
    +                    "<server_address>:<port>. Only one server will be 
connected to at a time, the others " +
    +                    "will be used as fail overs.")
    +            .required(true)
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor FAILOVER_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name("Failover Endpoint")
    +            .description("A failover server to connect to if one of the 
main ones is unreachable after the connection " +
    +                    "attempt count. The format should be 
<server_address>:<port>.")
    +            .required(false)
    +            // .defaultValue("")
    +            .addValidator(ENDPOINT_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor CONNECTION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout")
    +            .description("The amount of time to wait before timing out 
while creating a connection")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("5 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTION_ATTEMPT_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Attempt Count")
    +            .description("The number of times to try and establish a 
connection, before using a backup host if available." +
    +                    " This same attempt count would be used for a backup 
host as well.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("3")
    +            .build();
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Number of Messages in Batch")
    +            .description("The number of messages to write to the flow file 
content")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor RECEIVE_BUFFER_SIZE = new 
PropertyDescriptor
    +            .Builder().name("Receive Buffer Size")
    +            .description("The size of the buffer to receive data in")
    +            .required(false)
    +            .defaultValue("2048")
    +            .addValidator(StandardValidators.createLongValidator(1, 2048, 
true))
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE = new 
PropertyDescriptor
    +            .Builder().name("Keep Alive")
    +            .description("This determines if TCP keep alive is used.")
    +            .required(false)
    +            .defaultValue("true")
    +            .allowableValues("true", "false")
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("Success")
    +            .description("The relationship that all sucessful messages 
from the WebSocket will be sent to")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("Failure")
    +            .description("The relationship that all failed messages from 
the WebSocket will be sent to")
    +            .build();
    +
    +    private final static List<PropertyDescriptor> propertyDescriptors;
    +    private final static Set<Relationship> relationships;
    +    private final static Charset charset = 
Charset.forName(StandardCharsets.UTF_8.name());
    +    private final Map<String, String> dynamicAttributes = new HashMap<>();
    +    private volatile Set<String> dynamicPropertyNames = new HashSet<>();
    +
    +    private AtomicBoolean connectedToBackup = new AtomicBoolean();
    +
    +
    +    /*
    +    * Will ensure that the list of property descriptors is build only once.
    +    * Will also create a Set of relationships
    +    */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(ENDPOINT_LIST);
    +        _propertyDescriptors.add(FAILOVER_ENDPOINT);
    +        _propertyDescriptors.add(CONNECTION_TIMEOUT);
    +        _propertyDescriptors.add(CONNECTION_ATTEMPT_COUNT);
    +        _propertyDescriptors.add(BATCH_SIZE);
    +        _propertyDescriptors.add(RECEIVE_BUFFER_SIZE);
    +        _propertyDescriptors.add(KEEP_ALIVE);
    +
    +
    +        propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        _relationships.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    private Map<SocketRecveiverThread, Future> socketToFuture = new 
HashMap<>();
    +    private ExecutorService executorService;
    +    private ComponentLog log = getLogger();
    +    private String originalServerAddressList;
    +    private String backupServer;
    +    private int batchSize;
    +
    +    /**
    +     * Bounded queue of messages events from the socket.
    +     */
    +    protected final BlockingQueue<String> socketMessagesReceived = new 
ArrayBlockingQueue<>(256);
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(true)
    +                .build();
    +    }
    +
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
ProcessException {
    +
    +
    +        final int rcvBufferSize = 
context.getProperty(RECEIVE_BUFFER_SIZE).asInteger();
    +        final boolean keepAlive = 
context.getProperty(KEEP_ALIVE).asBoolean();
    +        final int connectionTimeout = 
context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
    +        final int connectionRetryCount = 
context.getProperty(CONNECTION_ATTEMPT_COUNT).asInteger();
    +        originalServerAddressList = 
context.getProperty(ENDPOINT_LIST).getValue();
    +        final String[] serverAddresses = 
originalServerAddressList.split(",");
    +        backupServer = context.getProperty(FAILOVER_ENDPOINT).getValue();
    +        executorService = 
Executors.newFixedThreadPool(serverAddresses.length);
    +        batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +
    +        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
    +            final PropertyDescriptor descriptor = entry.getKey();
    +            if (descriptor.isDynamic()) {
    +                dynamicAttributes.put(descriptor.getName(), 
entry.getValue());
    +
    +            }
    +        }
    +
    +        //Go through all of the servers that we need to connect to and 
establish a connection to each of them
    +        //this will result in all of them generating data that goes into 
the flow files.
    +        for (final String hostPortPair : serverAddresses) {
    +            //split pair
    +            final String[] hostAndPort = hostPortPair.split(":");
    +
    +
    +            SocketRecveiverThread socketRecveiverThread = null;
    +            try {
    +                socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, hostAndPort);
    +            } catch (Exception ex) {
    +                // Should we re-throw this or throw a ProcessException? If 
there is only one host in the list  and no backup
    +                // then we should otherwise there will be no data and we 
cannot really run. For a single hosts / port
    +                // pair it may make sense to throw as well and then 
perhaps provide a property to allow the user to decide
    +                // if there are more than one servers in the list do we 
throw or just log it. For now will throw and
    +                // treat it as an error.
    +                log = getLogger();
    +                log.error("Caught exception trying to create thread to 
process data from host: {}", new Object[]{hostAndPort[0], ex});
    +
    +                if (!connectedToBackup.get() && !backupServer.isEmpty()) {
    +                    log.error("Attempting connection to backup server: 
{}", new Object[]{backupServer});
    +                    try {
    +                        final String[] backupHostAndPort = 
backupServer.split(":");
    +                        socketRecveiverThread = 
createSocketRecveiverThread(rcvBufferSize, keepAlive, connectionTimeout, 
connectionRetryCount, backupHostAndPort);
    +                        connectedToBackup.set(true);
    +                    } catch (Exception backupException) {
    +
    +                        log.error("Caught exception trying to connect to 
backup server: ", new Object[]{backupServer, ex});
    +                        throw new ProcessException(String.format("Caught 
exception trying to create thread to process " +
    +                                "data from backup server: %s", 
backupServer), backupException);
    +                    }
    +                } else {
    +                    final String connectionMsg = connectedToBackup.get() ? 
String.format("Already connected to backup %s", backupServer) :
    +                            "No backup server configured to connect to";
    +                    log.info(connectionMsg);
    +                    throw new ProcessException(String.format("Caught 
exception trying to create thread to process data from host: %s", 
hostAndPort[0]), ex);
    +                }
    +            }
    +
    +            getLogger().info("Created thread to process data from host: 
{}", new Object[]{hostAndPort[0]});
    +            final Future socketReceiverFuture = 
executorService.submit(socketRecveiverThread);
    +            socketToFuture.put(socketRecveiverThread, 
socketReceiverFuture);
    +
    +
    +        }
    +
    +    }
    +
    +    protected SocketRecveiverThread createSocketRecveiverThread(int 
rcvBufferSize, boolean keepAlive, int connectionTimeout, int 
connectionRetryCount, String[] hostAndPort) {
    +        SocketRecveiverThread socketRecveiverThread;
    +        socketRecveiverThread = new SocketRecveiverThread(hostAndPort[0],
    +                Integer.parseInt(hostAndPort[1]), backupServer, keepAlive, 
rcvBufferSize, connectionTimeout, connectionRetryCount);
    +        return socketRecveiverThread;
    +    }
    +
    +    @OnStopped
    +    public void tearDown() throws ProcessException {
    +        try {
    +            for (Map.Entry<SocketRecveiverThread, Future> socketAndFuture 
: socketToFuture.entrySet()) {
    +                socketAndFuture.getKey().stopProcessing();
    +                socketAndFuture.getValue().cancel(true);
    +
    +            }
    +            if (null != executorService) {
    +                executorService.shutdown();
    +                if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS)) {
    +                    executorService.shutdownNow();
    +
    +                    if (!executorService.awaitTermination(5, 
TimeUnit.SECONDS))
    +                        log.error("Executor service for receiver thread 
did not terminate");
    +                }
    +            }
    +        } catch (final Exception e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +
    +        try {
    +            final StringBuilder messages = new StringBuilder();
    +            if (socketMessagesReceived.size() >= batchSize) {
    +                for (int i = 0; i < batchSize; i++) {
    +                    messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +                }
    +            } else {
    +                messages.append(socketMessagesReceived.poll(100, 
TimeUnit.MILLISECONDS));
    +            }
    +            if (0 == messages.length()) {
    +                return;
    +            }
    +            FlowFile flowFile = session.create();
    +
    +            flowFile = session.write(flowFile, out -> 
out.write(messages.toString().getBytes()));
    +            //need to at least put the list of hosts and ports, would be 
nice to know the exact
    +            //host the message came frome -- to do that we need to do one 
of two things
    +            //
    +            // 1. Have a message queue per server
    +            // 2. Add something to the message that then needs to be 
parsed.
    +            //
    +            // Both of these are not ideal, for now just show the whole 
list and someone can still use a
    +            // ROA processor and see if the attribute contains a host.
    +            flowFile = session.putAttribute(flowFile, "ServerList", 
originalServerAddressList);
    +            //add any dynamic properties
    +            if (0 < dynamicAttributes.size()) {
    +                flowFile = session.putAllAttributes(flowFile, 
dynamicAttributes);
    +            }
    +            session.transfer(flowFile, REL_SUCCESS);
    +
    +        } catch (InterruptedException exception) {
    +            throw new ProcessException(exception);
    --- End diff --
    
    Sounds good. Will change the code accordingly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to