Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2947#discussion_r221729013
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
 ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.queue.clustered.server;
    +
    +import org.apache.nifi.engine.FlowEngine;
    +import org.apache.nifi.events.EventReporter;
    +import org.apache.nifi.reporting.Severity;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.net.ssl.SSLContext;
    +import javax.net.ssl.SSLServerSocket;
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.ServerSocket;
    +import java.net.Socket;
    +import java.net.SocketTimeoutException;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +public class ConnectionLoadBalanceServer {
    +    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
    +
    +    private final String hostname;
    +    private final int port;
    +    private final SSLContext sslContext;
    +    private final ExecutorService threadPool;
    +    private final LoadBalanceProtocol loadBalanceProtocol;
    +    private final int connectionTimeoutMillis;
    +    private final int numThreads;
    +    private final EventReporter eventReporter;
    +
    +    private volatile Set<CommunicateAction> communicationActions = 
Collections.emptySet();
    +    private final BlockingQueue<Socket> connectionQueue = new 
LinkedBlockingQueue<>();
    +
    +    private volatile AcceptConnection acceptConnection;
    +    private volatile ServerSocket serverSocket;
    +    private volatile boolean stopped = true;
    +
    +    public ConnectionLoadBalanceServer(final String hostname, final int 
port, final SSLContext sslContext, final int numThreads, final 
LoadBalanceProtocol loadBalanceProtocol,
    +                                       final EventReporter eventReporter, 
final int connectionTimeoutMillis) {
    +        this.hostname = hostname;
    +        this.port = port;
    +        this.sslContext = sslContext;
    +        this.loadBalanceProtocol = loadBalanceProtocol;
    +        this.connectionTimeoutMillis = connectionTimeoutMillis;
    +        this.numThreads = numThreads;
    +        this.eventReporter = eventReporter;
    +
    +        threadPool = new FlowEngine(numThreads, "Load Balance Server");
    +    }
    +
    +    public void start() throws IOException {
    +        if (!stopped) {
    +            return;
    +        }
    +
    +        stopped = false;
    +        if (serverSocket != null) {
    +            return;
    +        }
    +
    +        try {
    +            serverSocket = createServerSocket();
    +        } catch (final Exception e) {
    +            throw new IOException("Could not begin listening for incoming 
connections in order to load balance data across the cluster. Please verify the 
values of the " +
    +                    "'nifi.cluster.load.balance.port' and 
'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' 
properties", e);
    +        }
    +
    +        final Set<CommunicateAction> actions = new HashSet<>(numThreads);
    +        for (int i=0; i < numThreads; i++) {
    +            final CommunicateAction action = new 
CommunicateAction(loadBalanceProtocol);
    +            actions.add(action);
    +            threadPool.submit(action);
    +        }
    +
    +        this.communicationActions = actions;
    +
    +        acceptConnection = new AcceptConnection(serverSocket);
    +        final Thread receiveConnectionThread = new 
Thread(acceptConnection);
    +        receiveConnectionThread.setName("Receive Queue Load-Balancing 
Connections");
    +        receiveConnectionThread.start();
    +    }
    +
    +    public int getPort() {
    +        return serverSocket.getLocalPort();
    +    }
    +
    +    public void stop() {
    +        stopped = false;
    +        threadPool.shutdown();
    +
    +        if (acceptConnection != null) {
    +            acceptConnection.stop();
    +        }
    +
    +        communicationActions.forEach(CommunicateAction::stop);
    +
    +        Socket socket;
    +        while ((socket = connectionQueue.poll()) != null) {
    +            try {
    +                socket.close();
    +                logger.info("{} Closed connection to {} on Server stop", 
this, socket.getRemoteSocketAddress());
    +            } catch (final IOException ioe) {
    +                logger.warn("Failed to properly close socket to " + 
socket.getRemoteSocketAddress(), ioe);
    +            }
    +        }
    +    }
    +
    +    private ServerSocket createServerSocket() throws IOException {
    +        final InetAddress inetAddress = hostname == null ? null : 
InetAddress.getByName(hostname);
    +
    +        if (sslContext == null) {
    +            return new ServerSocket(port, 50, 
InetAddress.getByName(hostname));
    +        } else {
    +            final ServerSocket serverSocket = 
sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress);
    +            ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
    +            return serverSocket;
    +        }
    +    }
    +
    +
    +    private class CommunicateAction implements Runnable {
    +        private final LoadBalanceProtocol loadBalanceProtocol;
    +        private volatile boolean stopped = false;
    +
    +        public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol) {
    +            this.loadBalanceProtocol = loadBalanceProtocol;
    +        }
    +
    +        public void stop() {
    +            this.stopped = true;
    +        }
    +
    +        @Override
    +        public void run() {
    +            String peerDescription = "<Unknown Client>";
    +
    +            while (!stopped) {
    +                Socket socket = null;
    +                try {
    +                    socket = connectionQueue.poll(1, TimeUnit.SECONDS);
    +                    if (socket == null) {
    +                        continue;
    +                    }
    +
    +                    peerDescription = 
socket.getRemoteSocketAddress().toString();
    +
    +                    if (socket.isClosed()) {
    +                        logger.debug("Connection to Peer {} is closed. 
Will not attempt to communicate over this Socket.", peerDescription);
    +                        continue;
    +                    }
    +
    +                    logger.debug("Receiving FlowFiles from Peer {}", 
peerDescription);
    +                    loadBalanceProtocol.receiveFlowFiles(socket);
    --- End diff --
    
    @ijokarumawak please ignore my comment above :) After thinking about it a 
good bit more, I realized that the way I was thinking about it previously was 
far more complicated than it needed to be. I have pushed one more commit that 
will take backpressure into account.
    
    It works by first checking with the remote node to see if there is space in 
the queue or if backpressure is applied. If it's applied, the node won't send 
data. This will cause the data to queue up on the 'client' node, which will 
cause backpressure to engage there as well.


---

Reply via email to