Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2947#discussion_r221641452
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+public class ConnectionLoadBalanceServer {
+ private static final Logger logger =
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
+
+ private final String hostname;
+ private final int port;
+ private final SSLContext sslContext;
+ private final ExecutorService threadPool;
+ private final LoadBalanceProtocol loadBalanceProtocol;
+ private final int connectionTimeoutMillis;
+ private final int numThreads;
+ private final EventReporter eventReporter;
+
+ private volatile Set<CommunicateAction> communicationActions =
Collections.emptySet();
+ private final BlockingQueue<Socket> connectionQueue = new
LinkedBlockingQueue<>();
+
+ private volatile AcceptConnection acceptConnection;
+ private volatile ServerSocket serverSocket;
+ private volatile boolean stopped = true;
+
+ public ConnectionLoadBalanceServer(final String hostname, final int
port, final SSLContext sslContext, final int numThreads, final
LoadBalanceProtocol loadBalanceProtocol,
+ final EventReporter eventReporter,
final int connectionTimeoutMillis) {
+ this.hostname = hostname;
+ this.port = port;
+ this.sslContext = sslContext;
+ this.loadBalanceProtocol = loadBalanceProtocol;
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.numThreads = numThreads;
+ this.eventReporter = eventReporter;
+
+ threadPool = new FlowEngine(numThreads, "Load Balance Server");
+ }
+
+ public void start() throws IOException {
+ if (!stopped) {
+ return;
+ }
+
+ stopped = false;
+ if (serverSocket != null) {
+ return;
+ }
+
+ try {
+ serverSocket = createServerSocket();
+ } catch (final Exception e) {
+ throw new IOException("Could not begin listening for incoming
connections in order to load balance data across the cluster. Please verify the
values of the " +
+ "'nifi.cluster.load.balance.port' and
'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*'
properties", e);
+ }
+
+ final Set<CommunicateAction> actions = new HashSet<>(numThreads);
+ for (int i=0; i < numThreads; i++) {
+ final CommunicateAction action = new
CommunicateAction(loadBalanceProtocol);
+ actions.add(action);
+ threadPool.submit(action);
+ }
+
+ this.communicationActions = actions;
+
+ acceptConnection = new AcceptConnection(serverSocket);
+ final Thread receiveConnectionThread = new
Thread(acceptConnection);
+ receiveConnectionThread.setName("Receive Queue Load-Balancing
Connections");
+ receiveConnectionThread.start();
+ }
+
+ public int getPort() {
+ return serverSocket.getLocalPort();
+ }
+
+ public void stop() {
+ stopped = false;
+ threadPool.shutdown();
+
+ if (acceptConnection != null) {
+ acceptConnection.stop();
+ }
+
+ communicationActions.forEach(CommunicateAction::stop);
+
+ Socket socket;
+ while ((socket = connectionQueue.poll()) != null) {
+ try {
+ socket.close();
+ logger.info("{} Closed connection to {} on Server stop",
this, socket.getRemoteSocketAddress());
+ } catch (final IOException ioe) {
+ logger.warn("Failed to properly close socket to " +
socket.getRemoteSocketAddress(), ioe);
+ }
+ }
+ }
+
+ private ServerSocket createServerSocket() throws IOException {
+ final InetAddress inetAddress = hostname == null ? null :
InetAddress.getByName(hostname);
+
+ if (sslContext == null) {
+ return new ServerSocket(port, 50,
InetAddress.getByName(hostname));
+ } else {
+ final ServerSocket serverSocket =
sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress);
+ ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
+ return serverSocket;
+ }
+ }
+
+
+ private class CommunicateAction implements Runnable {
+ private final LoadBalanceProtocol loadBalanceProtocol;
+ private volatile boolean stopped = false;
+
+ public CommunicateAction(final LoadBalanceProtocol
loadBalanceProtocol) {
+ this.loadBalanceProtocol = loadBalanceProtocol;
+ }
+
+ public void stop() {
+ this.stopped = true;
+ }
+
+ @Override
+ public void run() {
+ String peerDescription = "<Unknown Client>";
+
+ while (!stopped) {
+ Socket socket = null;
+ try {
+ socket = connectionQueue.poll(1, TimeUnit.SECONDS);
+ if (socket == null) {
+ continue;
+ }
+
+ peerDescription =
socket.getRemoteSocketAddress().toString();
+
+ if (socket.isClosed()) {
+ logger.debug("Connection to Peer {} is closed.
Will not attempt to communicate over this Socket.", peerDescription);
+ continue;
+ }
+
+ logger.debug("Receiving FlowFiles from Peer {}",
peerDescription);
+ loadBalanceProtocol.receiveFlowFiles(socket);
--- End diff --
In addition to documentation, I do think it would be nice in the UI, when
configuring Load Balancing, to indicate that Backpressure Thresholds may not be
honored when using load balancing. I also created NIFI-5651 to track this.
---