[ https://issues.apache.org/jira/browse/NIFI-5516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634150#comment-16634150 ]
ASF GitHub Bot commented on NIFI-5516: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2947#discussion_r221641452 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + + +public class ConnectionLoadBalanceServer { + private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class); + + private final String hostname; + private final int port; + private final SSLContext sslContext; + private final ExecutorService threadPool; + private final LoadBalanceProtocol loadBalanceProtocol; + private final int connectionTimeoutMillis; + private final int numThreads; + private final EventReporter eventReporter; + + private volatile Set<CommunicateAction> communicationActions = Collections.emptySet(); + private final BlockingQueue<Socket> connectionQueue = new LinkedBlockingQueue<>(); + + private volatile AcceptConnection acceptConnection; + private volatile ServerSocket serverSocket; + private volatile boolean stopped = true; + + public ConnectionLoadBalanceServer(final String hostname, final int port, final SSLContext sslContext, final int numThreads, final LoadBalanceProtocol loadBalanceProtocol, + final EventReporter eventReporter, final int connectionTimeoutMillis) { + this.hostname = hostname; + this.port = port; + this.sslContext = sslContext; + this.loadBalanceProtocol = loadBalanceProtocol; + this.connectionTimeoutMillis = connectionTimeoutMillis; + this.numThreads = numThreads; + this.eventReporter = eventReporter; + + threadPool = new FlowEngine(numThreads, "Load Balance Server"); + } + + public void start() throws IOException { + if (!stopped) { + return; + } + + stopped = false; + if (serverSocket != null) { + return; + } + + try { + serverSocket = createServerSocket(); + } catch (final Exception e) { + throw new IOException("Could not begin listening for incoming connections in order to load balance data across the cluster. Please verify the values of the " + + "'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' properties", e); + } + + final Set<CommunicateAction> actions = new HashSet<>(numThreads); + for (int i=0; i < numThreads; i++) { + final CommunicateAction action = new CommunicateAction(loadBalanceProtocol); + actions.add(action); + threadPool.submit(action); + } + + this.communicationActions = actions; + + acceptConnection = new AcceptConnection(serverSocket); + final Thread receiveConnectionThread = new Thread(acceptConnection); + receiveConnectionThread.setName("Receive Queue Load-Balancing Connections"); + receiveConnectionThread.start(); + } + + public int getPort() { + return serverSocket.getLocalPort(); + } + + public void stop() { + stopped = false; + threadPool.shutdown(); + + if (acceptConnection != null) { + acceptConnection.stop(); + } + + communicationActions.forEach(CommunicateAction::stop); + + Socket socket; + while ((socket = connectionQueue.poll()) != null) { + try { + socket.close(); + logger.info("{} Closed connection to {} on Server stop", this, socket.getRemoteSocketAddress()); + } catch (final IOException ioe) { + logger.warn("Failed to properly close socket to " + socket.getRemoteSocketAddress(), ioe); + } + } + } + + private ServerSocket createServerSocket() throws IOException { + final InetAddress inetAddress = hostname == null ? null : InetAddress.getByName(hostname); + + if (sslContext == null) { + return new ServerSocket(port, 50, InetAddress.getByName(hostname)); + } else { + final ServerSocket serverSocket = sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress); + ((SSLServerSocket) serverSocket).setNeedClientAuth(true); + return serverSocket; + } + } + + + private class CommunicateAction implements Runnable { + private final LoadBalanceProtocol loadBalanceProtocol; + private volatile boolean stopped = false; + + public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol) { + this.loadBalanceProtocol = loadBalanceProtocol; + } + + public void stop() { + this.stopped = true; + } + + @Override + public void run() { + String peerDescription = "<Unknown Client>"; + + while (!stopped) { + Socket socket = null; + try { + socket = connectionQueue.poll(1, TimeUnit.SECONDS); + if (socket == null) { + continue; + } + + peerDescription = socket.getRemoteSocketAddress().toString(); + + if (socket.isClosed()) { + logger.debug("Connection to Peer {} is closed. Will not attempt to communicate over this Socket.", peerDescription); + continue; + } + + logger.debug("Receiving FlowFiles from Peer {}", peerDescription); + loadBalanceProtocol.receiveFlowFiles(socket); --- End diff -- In addition to documentation, I do think it would be nice in the UI, when configuring Load Balancing, to indicate that Backpressure Thresholds may not be honored when using load balancing. I also created NIFI-5651 to track this. > Allow data in a Connection to be Load-Balanced across cluster > ------------------------------------------------------------- > > Key: NIFI-5516 > URL: https://issues.apache.org/jira/browse/NIFI-5516 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Priority: Major > > Allow user to configure a Connection to be load balanced across the cluster. > For more information, see Feature Proposal at > https://cwiki.apache.org/confluence/display/NIFI/Load-Balanced+Connections -- This message was sent by Atlassian JIRA (v7.6.3#76005)