http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index e31a547..9eaffd3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -16,15 +16,6 @@ */ package org.apache.nifi.cluster.protocol.impl; -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.security.cert.CertificateException; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; @@ -49,6 +40,22 @@ import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + /** * Implements a listener for protocol messages sent over unicast socket. * @@ -82,7 +89,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi @Override public void start() throws IOException { - if (super.isRunning()) { throw new IllegalStateException("Instance is already started."); } @@ -92,7 +98,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi @Override public void stop() throws IOException { - if (super.isRunning() == false) { throw new IOException("Instance is already stopped."); } @@ -128,8 +133,6 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi final String requestId = UUID.randomUUID().toString(); logger.debug("Received request {} from {}", requestId, hostname); - String requestorDn = getRequestorDN(socket); - // unmarshall message final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream()); @@ -151,7 +154,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } } - request.setRequestorDN(requestorDn); + final Set<String> nodeIdentities = getCertificateIdentities(socket); // dispatch message to handler ProtocolHandler desiredHandler = null; @@ -168,7 +171,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi logger.error("Received request of type {} but none of the following Protocol Handlers were able to process the request: {}", request.getType(), handlers); throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); } else { - final ProtocolMessage response = desiredHandler.handle(request); + final ProtocolMessage response = desiredHandler.handle(request, nodeIdentities); if (response != null) { try { logger.debug("Sending response for request {}", requestId); @@ -218,11 +221,32 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi } } - private String getRequestorDN(Socket socket) { - try { - return CertificateUtils.extractPeerDNFromSSLSocket(socket); - } catch (CertificateException e) { - throw new ProtocolException(e); + private Set<String> getCertificateIdentities(final Socket socket) throws IOException { + if (socket instanceof SSLSocket) { + try { + final SSLSession sslSession = ((SSLSocket) socket).getSession(); + return getCertificateIdentities(sslSession); + } catch (CertificateException e) { + throw new IOException("Could not extract Subject Alternative Names from client's certificate", e); + } + } else { + return Collections.emptySet(); + } + } + + private Set<String> getCertificateIdentities(final SSLSession sslSession) throws CertificateException, SSLPeerUnverifiedException { + final Certificate[] certs = sslSession.getPeerCertificates(); + if (certs == null || certs.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); } + + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]); + cert.checkValidity(); + + final Set<String> identities = CertificateUtils.getSubjectAlternativeNames(cert).stream() + .map(CertificateUtils::extractUsername) + .collect(Collectors.toSet()); + + return identities; } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java index a2d9968..dbc988b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java @@ -25,6 +25,8 @@ public class AdaptedNodeIdentifier { private int apiPort; private String socketAddress; private int socketPort; + private String loadBalanceAddress; + private int loadBalancePort; private String siteToSiteAddress; private Integer siteToSitePort; private Integer siteToSiteHttpApiPort; @@ -74,6 +76,22 @@ public class AdaptedNodeIdentifier { this.socketPort = socketPort; } + public String getLoadBalanceAddress() { + return loadBalanceAddress; + } + + public void setLoadBalanceAddress(final String loadBalanceAddress) { + this.loadBalanceAddress = loadBalanceAddress; + } + + public int getLoadBalancePort() { + return loadBalancePort; + } + + public void setLoadBalancePort(final int loadBalancePort) { + this.loadBalancePort = loadBalancePort; + } + public String getSiteToSiteAddress() { return siteToSiteAddress; } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java index 4a2660f..29aa451 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java @@ -34,6 +34,8 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod aNi.setApiPort(ni.getApiPort()); aNi.setSocketAddress(ni.getSocketAddress()); aNi.setSocketPort(ni.getSocketPort()); + aNi.setLoadBalanceAddress(ni.getLoadBalanceAddress()); + aNi.setLoadBalancePort(ni.getLoadBalancePort()); aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress()); aNi.setSiteToSitePort(ni.getSiteToSitePort()); aNi.setSiteToSiteHttpApiPort(ni.getSiteToSiteHttpApiPort()); @@ -47,7 +49,7 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod if (aNi == null) { return null; } else { - return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), + return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), aNi.getLoadBalanceAddress(), aNi.getLoadBalancePort(), aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(),aNi.getSiteToSiteHttpApiPort(), aNi.isSiteToSiteSecure()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index 1cab62f..482f5d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -18,8 +18,6 @@ package org.apache.nifi.cluster.protocol.message; public abstract class ProtocolMessage { - private volatile String requestorDN; - public static enum MessageType { CONNECTION_REQUEST, CONNECTION_RESPONSE, @@ -42,21 +40,4 @@ public abstract class ProtocolMessage { public abstract MessageType getType(); - /** - * Sets the DN of the entity making the request - * - * @param dn dn of the entity making the request - */ - public void setRequestorDN(final String dn) { - this.requestorDN = dn; - } - - /** - * @return the DN of the entity that made the request, if using a secure - * socket. Otherwise, returns <code>null</code> - */ - public String getRequestorDN() { - return requestorDN; - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml index 63ab689..e2d5bf2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml @@ -27,17 +27,17 @@ <util:constant static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/> </constructor-arg> </bean> - + <!-- socket configuration --> <bean id="protocolSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean"> <property name="properties" ref="nifiProperties"/> </bean> - + <!-- server socket configuration --> <bean id="protocolServerSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean"> <property name="properties" ref="nifiProperties"/> </bean> - + <!-- cluster manager protocol sender --> <bean id="clusterCoordinationProtocolSender" class="org.apache.nifi.cluster.protocol.impl.StandardClusterCoordinationProtocolSender"> <constructor-arg ref="protocolSocketConfiguration"/> @@ -49,13 +49,13 @@ <bean factory-bean="nifiProperties" factory-method="getClusterNodeConnectionTimeout"/> </property> </bean> - + <!-- cluster manager sender/listener --> <bean id="clusterCoordinationProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener"> <constructor-arg ref="clusterCoordinationProtocolSender"/> <constructor-arg ref="protocolListener"/> </bean> - + <!-- node protocol sender --> <!-- <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender"> @@ -69,7 +69,7 @@ <constructor-arg ref="protocolContext"/> <constructor-arg ref="leaderElectionManager"/> </bean> - + <!-- protocol listener --> <bean id="protocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> <constructor-arg index="0"> @@ -81,7 +81,7 @@ <constructor-arg ref="protocolServerSocketConfiguration" index="2"/> <constructor-arg ref="protocolContext" index="3"/> </bean> - + <!-- node sender/listener --> <bean id="nodeProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener"> <constructor-arg ref="nodeProtocolSender"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java index d6d83ef..aff4b11 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.cluster.protocol.impl.testutils; -import java.util.ArrayList; -import java.util.List; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + /** */ public class DelayedProtocolHandler implements ProtocolHandler { @@ -34,7 +36,7 @@ public class DelayedProtocolHandler implements ProtocolHandler { } @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException { try { messages.add(msg); Thread.sleep(delay); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java index ccf2c4c..05d5b77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.cluster.protocol.impl.testutils; -import java.util.ArrayList; -import java.util.List; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + /** */ public class ReflexiveProtocolHandler implements ProtocolHandler { @@ -29,7 +31,7 @@ public class ReflexiveProtocolHandler implements ProtocolHandler { private List<ProtocolMessage> messages = new ArrayList<>(); @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + public ProtocolMessage handle(ProtocolMessage msg, Set<String> nodeIdentities) throws ProtocolException { messages.add(msg); return msg; } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 35bf510..4395883 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -29,6 +29,7 @@ import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -198,7 +199,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { final NodeIdentifier nodeId = heartbeat.getNodeIdentifier(); // Do not process heartbeat if it's blocked by firewall. - if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) { + if (clusterCoordinator.isBlockedByFirewall(Collections.singleton(nodeId.getSocketAddress()))) { clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request."); // request node to disconnect http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 2d6f023..43f3f2b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.cluster.coordination.heartbeat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -46,6 +36,17 @@ import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + /** * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and * then relies on the NiFi Cluster Protocol to receive heartbeat messages from @@ -134,7 +135,7 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } @Override - public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { + public ProtocolMessage handle(final ProtocolMessage msg, Set<String> nodeIds) throws ProtocolException { switch (msg.getType()) { case HEARTBEAT: return handleHeartbeat((HeartbeatMessage) msg); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 4e4625c..484d155 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -16,13 +16,19 @@ */ package org.apache.nifi.cluster.coordination.node; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; import org.apache.nifi.cluster.coordination.flow.FlowElection; import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; +import org.apache.nifi.cluster.coordination.node.state.NodeIdentifierDescriptor; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.NodeEvent; import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; @@ -49,8 +55,14 @@ import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.Severity; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.NiFiProperties; @@ -59,6 +71,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -69,6 +82,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -93,6 +107,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final AtomicLong latestUpdateId = new AtomicLong(-1); private final FlowElection flowElection; private final NodeProtocolSender nodeProtocolSender; + private final StateManager stateManager; private volatile FlowService flowService; private volatile boolean connected; @@ -102,9 +117,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>(); private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<>(); + private final List<ClusterTopologyEventListener> eventListeners = new CopyOnWriteArrayList<>(); + + public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager, + final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties, + final NodeProtocolSender nodeProtocolSender) throws IOException { + this(senderListener, eventReporter, leaderElectionManager, flowElection, firewall, revisionManager, nifiProperties, nodeProtocolSender, + StandardStateManagerProvider.create(nifiProperties, VariableRegistry.EMPTY_REGISTRY)); + } + public NodeClusterCoordinator(final ClusterCoordinationProtocolSenderListener senderListener, final EventReporter eventReporter, final LeaderElectionManager leaderElectionManager, final FlowElection flowElection, final ClusterNodeFirewall firewall, final RevisionManager revisionManager, final NiFiProperties nifiProperties, - final NodeProtocolSender nodeProtocolSender) { + final NodeProtocolSender nodeProtocolSender, final StateManagerProvider stateManagerProvider) throws IOException { this.senderListener = senderListener; this.flowService = null; this.eventReporter = eventReporter; @@ -114,10 +138,98 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl this.leaderElectionManager = leaderElectionManager; this.flowElection = flowElection; this.nodeProtocolSender = nodeProtocolSender; + this.stateManager = stateManagerProvider.getStateManager("Cluster Coordinator"); + + recoverState(); senderListener.addHandler(this); } + private void recoverState() throws IOException { + final StateMap stateMap = stateManager.getState(Scope.LOCAL); + if (stateMap == null) { + logger.debug("No state to restore"); + return; + } + + final ObjectMapper mapper = new ObjectMapper(); + final JsonFactory jsonFactory = new JsonFactory(); + jsonFactory.setCodec(mapper); + + final Map<NodeIdentifier, NodeConnectionStatus> connectionStatusMap = new HashMap<>(); + NodeIdentifier localNodeId = null; + + final Map<String, String> state = stateMap.toMap(); + for (final Map.Entry<String, String> entry : state.entrySet()) { + final String nodeUuid = entry.getKey(); + final String nodeIdentifierJson = entry.getValue(); + logger.debug("Recovering state for {} = {}", nodeUuid, nodeIdentifierJson); + + try (final JsonParser jsonParser = jsonFactory.createParser(nodeIdentifierJson)) { + final NodeIdentifierDescriptor nodeIdDesc = jsonParser.readValueAs(NodeIdentifierDescriptor.class); + final NodeIdentifier nodeId = nodeIdDesc.toNodeIdentifier(); + + connectionStatusMap.put(nodeId, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); + if (nodeIdDesc.isLocalNodeIdentifier()) { + if (localNodeId == null) { + localNodeId = nodeId; + } else { + logger.warn("When recovering state, determined that tgwo Node Identifiers claim to be the local Node Identifier: {} and {}. Will ignore both of these and wait until " + + "connecting to cluster to determine which Node Identiifer is the local Node Identifier", localNodeId, nodeId); + localNodeId = null; + } + } + } + } + + if (!connectionStatusMap.isEmpty()) { + resetNodeStatuses(connectionStatusMap); + } + + if (localNodeId != null) { + logger.debug("Recovered state indicating that Local Node Identifier is {}", localNodeId); + setLocalNodeIdentifier(localNodeId); + } + } + + private void storeState() { + final ObjectMapper mapper = new ObjectMapper(); + final JsonFactory jsonFactory = new JsonFactory(); + jsonFactory.setCodec(mapper); + + try { + final Map<String, String> stateMap = new HashMap<>(); + + final NodeIdentifier localNodeId = getLocalNodeIdentifier(); + for (final NodeIdentifier nodeId : getNodeIdentifiers()) { + final boolean isLocalId = nodeId.equals(localNodeId); + final NodeIdentifierDescriptor descriptor = NodeIdentifierDescriptor.fromNodeIdentifier(nodeId, isLocalId); + + try (final StringWriter writer = new StringWriter()) { + final JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer); + jsonGenerator.writeObject(descriptor); + + final String serializedDescriptor = writer.toString(); + stateMap.put(nodeId.getId(), serializedDescriptor); + } + } + + stateManager.setState(stateMap, Scope.LOCAL); + logger.debug("Stored the following state as the Cluster Topology: {}", stateMap); + } catch (final Exception e) { + logger.warn("Failed to store cluster topology to local State Manager. Upon restart of NiFi, the cluster topology may not be accurate until joining the cluster.", e); + } + } + + + public void registerEventListener(final ClusterTopologyEventListener eventListener) { + this.eventListeners.add(eventListener); + } + + public void unregisterEventListener(final ClusterTopologyEventListener eventListener) { + this.eventListeners.remove(eventListener); + } + @Override public void shutdown() { if (closed) { @@ -136,8 +248,13 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void setLocalNodeIdentifier(final NodeIdentifier nodeId) { + if (nodeId == null || nodeId.equals(this.nodeId)) { + return; + } + this.nodeId = nodeId; nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED)); + eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId)); } @Override @@ -170,7 +287,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return localNodeId; } - private String getElectedActiveCoordinatorAddress() throws IOException { + private String getElectedActiveCoordinatorAddress() { return leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); } @@ -185,11 +302,62 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeConnectionStatus proposedStatus = entry.getValue(); if (proposedStatus.getState() == NodeConnectionState.REMOVED) { - nodeStatuses.remove(nodeId); + removeNode(nodeId); } else { - nodeStatuses.put(nodeId, proposedStatus); + updateNodeStatus(nodeId, proposedStatus, false); } } + + storeState(); + } + + private NodeConnectionStatus removeNode(final NodeIdentifier nodeId) { + final NodeConnectionStatus status = nodeStatuses.remove(nodeId); + nodeEvents.remove(nodeId); + if (status != null) { + onNodeRemoved(nodeId); + } + + return status; + } + + private boolean removeNodeConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus) { + final boolean removed = nodeStatuses.remove(nodeId, expectedStatus); + if (removed) { + nodeEvents.remove(nodeId); + onNodeRemoved(nodeId); + } + + return removed; + } + + private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) { + return updateNodeStatus(nodeId, updatedStatus, true); + } + + private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) { + final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, updatedStatus); + if (evictedStatus == null) { + onNodeAdded(nodeId, storeState); + } + + return evictedStatus; + } + + private boolean updateNodeStatusConditionally(final NodeIdentifier nodeId, final NodeConnectionStatus expectedStatus, final NodeConnectionStatus updatedStatus) { + final boolean updated; + if (expectedStatus == null) { + final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, updatedStatus); + updated = existingValue == null; + + if (updated) { + onNodeAdded(nodeId, true); + } + } else { + updated = nodeStatuses.replace(nodeId, expectedStatus, updatedStatus); + } + + return updated; } @Override @@ -228,17 +396,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl if (currentStatus == null) { if (newStatus.getState() == NodeConnectionState.REMOVED) { - return nodeStatuses.remove(nodeId, currentStatus); + return removeNodeConditionally(nodeId, currentStatus); } else { - final NodeConnectionStatus existingValue = nodeStatuses.putIfAbsent(nodeId, newStatus); - return existingValue == null; + return updateNodeStatusConditionally(nodeId, null, newStatus); } } if (newStatus.getState() == NodeConnectionState.REMOVED) { - return nodeStatuses.remove(nodeId, currentStatus); + if (removeNodeConditionally(nodeId, currentStatus)) { + storeState(); + return true; + } else { + return false; + } } else { - return nodeStatuses.replace(nodeId, currentStatus, newStatus); + return updateNodeStatusConditionally(nodeId, currentStatus, newStatus); } } @@ -348,9 +520,23 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl @Override public void removeNode(final NodeIdentifier nodeId, final String userDn) { reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster"); - nodeStatuses.remove(nodeId); - nodeEvents.remove(nodeId); notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED)); + removeNode(nodeId); + + storeState(); + } + + private void onNodeRemoved(final NodeIdentifier nodeId) { + eventListeners.stream().forEach(listener -> listener.onNodeRemoved(nodeId)); + } + + private void onNodeAdded(final NodeIdentifier nodeId, final boolean storeState) { + if (storeState) { + storeState(); + } + + + eventListeners.stream().forEach(listener -> listener.onNodeAdded(nodeId)); } @Override @@ -381,8 +567,18 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } @Override - public boolean isBlockedByFirewall(final String hostname) { - return firewall != null && !firewall.isPermissible(hostname); + public boolean isBlockedByFirewall(final Set<String> nodeIdentities) { + if (firewall == null) { + return false; + } + + for (final String nodeId : nodeIdentities) { + if (firewall.isPermissible(nodeId)) { + return false; + } + } + + return true; } @Override @@ -455,28 +651,21 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } private NodeIdentifier getElectedActiveCoordinatorNode(final boolean warnOnError) { - final String electedNodeAddress; + String electedNodeAddress; try { electedNodeAddress = getElectedActiveCoordinatorAddress(); } catch (final NoClusterCoordinatorException ncce) { logger.debug("There is currently no elected active Cluster Coordinator"); return null; - } catch (final IOException ioe) { - if (warnOnError) { - logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe); - if (logger.isDebugEnabled()) { - logger.warn("", ioe); - } - } - - return null; } - if (electedNodeAddress == null) { + if (electedNodeAddress == null || electedNodeAddress.trim().isEmpty()) { logger.debug("There is currently no elected active Cluster Coordinator"); return null; } + electedNodeAddress = electedNodeAddress.trim(); + final int colonLoc = electedNodeAddress.indexOf(':'); if (colonLoc < 1) { if (warnOnError) { @@ -519,6 +708,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl final NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus); if (existingStatus == null) { + onNodeAdded(connectionStatus.getNodeIdentifier(), true); return connectionStatus.getNodeIdentifier(); } else { return existingStatus.getNodeIdentifier(); @@ -594,7 +784,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // this method is called when something occurs that causes this node to change the status of the // node in question. We only use comparisons against the current value when we receive an update // about a node status from a different node, since those may be received out-of-order. - final NodeConnectionStatus currentStatus = nodeStatuses.put(nodeId, status); + final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status); final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState(); logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, status); logger.debug("State of cluster nodes is now {}", nodeStatuses); @@ -741,10 +931,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } @Override - public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException { + public ProtocolMessage handle(final ProtocolMessage protocolMessage, final Set<String> nodeIdentities) throws ProtocolException { switch (protocolMessage.getType()) { case CONNECTION_REQUEST: - return handleConnectionRequest((ConnectionRequestMessage) protocolMessage); + return handleConnectionRequest((ConnectionRequestMessage) protocolMessage, nodeIdentities); case NODE_STATUS_CHANGE: handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage); return null; @@ -790,9 +980,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // Either remove the value from the map or update the map depending on the connection state if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) { - nodeStatuses.remove(nodeId, oldStatus); + if (removeNodeConditionally(nodeId, oldStatus)) { + storeState(); + } } else { - nodeStatuses.put(nodeId, updatedStatus); + updateNodeStatus(nodeId, updatedStatus); } logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); @@ -838,6 +1030,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // there is no node with that ID resolvedNodeId = proposedIdentifier; logger.debug("No existing node with ID {}; resolved node ID is as-proposed", proposedIdentifier.getId()); + onNodeAdded(resolvedNodeId, true); } else if (existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) { // there is a node with that ID but it's the same node. resolvedNodeId = proposedIdentifier; @@ -854,28 +1047,37 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl return resolvedNodeId; } - private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) { + private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage, final Set<String> nodeIdentities) { final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier(); - final NodeIdentifier withRequestorDn = addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()); + final NodeIdentifier withNodeIdentities = addNodeIdentities(proposedIdentifier, nodeIdentities); final DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow(); - final ConnectionRequest requestWithDn = new ConnectionRequest(withRequestorDn, dataFlow); + final ConnectionRequest requestWithNodeIdentities = new ConnectionRequest(withNodeIdentities, dataFlow); // Resolve Node identifier. final NodeIdentifier resolvedNodeId = resolveNodeId(proposedIdentifier); + if (isBlockedByFirewall(nodeIdentities)) { + // if the socket address is not listed in the firewall, then return a null response + logger.info("Firewall blocked connection request from node " + resolvedNodeId + " with Node Identities " + nodeIdentities); + final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse(); + final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage(); + responseMessage.setConnectionResponse(response); + return responseMessage; + } + if (requireElection) { - final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withRequestorDn); + final DataFlow electedDataFlow = flowElection.castVote(dataFlow, withNodeIdentities); if (electedDataFlow == null) { - logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withRequestorDn); + logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", withNodeIdentities); return createFlowElectionInProgressResponse(); } else { - logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withRequestorDn); - return createConnectionResponse(requestWithDn, resolvedNodeId, electedDataFlow); + logger.info("Received Connection Request from {}; responding with DataFlow that was elected", withNodeIdentities); + return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId, electedDataFlow); } } - logger.info("Received Connection Request from {}; responding with my DataFlow", withRequestorDn); - return createConnectionResponse(requestWithDn, resolvedNodeId); + logger.info("Received Connection Request from {}; responding with my DataFlow", withNodeIdentities); + return createConnectionResponse(requestWithNodeIdentities, resolvedNodeId); } private ConnectionResponseMessage createFlowElectionInProgressResponse() { @@ -901,15 +1103,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private ConnectionResponseMessage createConnectionResponse(final ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final DataFlow clusterDataFlow) { - if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { - // if the socket address is not listed in the firewall, then return a null response - logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier); - final ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse(); - final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage(); - responseMessage.setConnectionResponse(response); - return responseMessage; - } - if (clusterDataFlow == null) { final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage(); responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available")); @@ -936,11 +1129,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } - private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { + private NodeIdentifier addNodeIdentities(final NodeIdentifier nodeId, final Set<String> nodeIdentities) { return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), + nodeId.getLoadBalanceAddress(), nodeId.getLoadBalancePort(), nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), - nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn); + nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), nodeIdentities); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java new file mode 100644 index 0000000..eeabbe3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/state/NodeIdentifierDescriptor.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.node.state; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class NodeIdentifierDescriptor { + private String id; + private String apiAddress; + private int apiPort; + private String socketAddress; + private int socketPort; + private String loadBalanceAddress; + private int loadBalancePort; + private String siteToSiteAddress; + private Integer siteToSitePort; + private Integer siteToSiteHttpApiPort; + private Boolean siteToSiteSecure; + private Set<String> nodeIdentities; + private boolean localNodeIdentifier; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getApiAddress() { + return apiAddress; + } + + public void setApiAddress(final String apiAddress) { + this.apiAddress = apiAddress; + } + + public int getApiPort() { + return apiPort; + } + + public void setApiPort(final int apiPort) { + this.apiPort = apiPort; + } + + public String getSocketAddress() { + return socketAddress; + } + + public void setSocketAddress(final String socketAddress) { + this.socketAddress = socketAddress; + } + + public int getSocketPort() { + return socketPort; + } + + public void setSocketPort(final int socketPort) { + this.socketPort = socketPort; + } + + public String getLoadBalanceAddress() { + return loadBalanceAddress; + } + + public void setLoadBalanceAddress(final String loadBalanceAddress) { + this.loadBalanceAddress = loadBalanceAddress; + } + + public int getLoadBalancePort() { + return loadBalancePort; + } + + public void setLoadBalancePort(final int loadBalancePort) { + this.loadBalancePort = loadBalancePort; + } + + public String getSiteToSiteAddress() { + return siteToSiteAddress; + } + + public void setSiteToSiteAddress(final String siteToSiteAddress) { + this.siteToSiteAddress = siteToSiteAddress; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(final Integer siteToSitePort) { + this.siteToSitePort = siteToSitePort; + } + + public Integer getSiteToSiteHttpApiPort() { + return siteToSiteHttpApiPort; + } + + public void setSiteToSiteHttpApiPort(final Integer siteToSiteHttpApiPort) { + this.siteToSiteHttpApiPort = siteToSiteHttpApiPort; + } + + public Boolean getSiteToSiteSecure() { + return siteToSiteSecure; + } + + public void setSiteToSiteSecure(final Boolean siteToSiteSecure) { + this.siteToSiteSecure = siteToSiteSecure; + } + + public Set<String> getNodeIdentities() { + return nodeIdentities; + } + + public void setNodeIdentities(final Set<String> nodeIdentities) { + this.nodeIdentities = Collections.unmodifiableSet(new HashSet<>(nodeIdentities)); + } + + public boolean isLocalNodeIdentifier() { + return localNodeIdentifier; + } + + public void setLocalNodeIdentifier(final boolean localNodeIdentifier) { + this.localNodeIdentifier = localNodeIdentifier; + } + + public static NodeIdentifierDescriptor fromNodeIdentifier(final NodeIdentifier nodeId, final boolean localNodeId) { + final NodeIdentifierDescriptor descriptor = new NodeIdentifierDescriptor(); + descriptor.setId(nodeId.getId()); + descriptor.setApiAddress(nodeId.getApiAddress()); + descriptor.setApiPort(nodeId.getApiPort()); + descriptor.setSocketAddress(nodeId.getSocketAddress()); + descriptor.setSocketPort(nodeId.getSocketPort()); + descriptor.setSiteToSiteAddress(nodeId.getSiteToSiteAddress()); + descriptor.setSiteToSitePort(nodeId.getSiteToSitePort()); + descriptor.setSiteToSiteHttpApiPort(nodeId.getSiteToSiteHttpApiPort()); + descriptor.setSiteToSiteSecure(nodeId.isSiteToSiteSecure()); + descriptor.setNodeIdentities(nodeId.getNodeIdentities()); + descriptor.setLoadBalanceAddress(nodeId.getLoadBalanceAddress()); + descriptor.setLoadBalancePort(nodeId.getLoadBalancePort()); + descriptor.setLocalNodeIdentifier(localNodeId); + return descriptor; + } + + public NodeIdentifier toNodeIdentifier() { + return new NodeIdentifier(getId(), getApiAddress(), getApiPort(), getSocketAddress(), getSocketPort(), getLoadBalanceAddress(), getLoadBalancePort(), + getSiteToSiteAddress(), getSiteToSitePort(), getSiteToSiteHttpApiPort(), getSiteToSiteSecure(), getNodeIdentities()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java index 7e3bc5d..a25fb4b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java @@ -17,10 +17,12 @@ package org.apache.nifi.cluster.manager; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; import java.util.Map; +import java.util.Objects; public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionEntity>, ComponentEntityStatusMerger<ConnectionStatusDTO> { @@ -33,6 +35,22 @@ public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionE mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); } } + + // If Load Balancing is configured but client entity indicates that data is not being transferred, we need to check if any other + // node is actively transferring data. If Client Entity is transferring data, we already know the correct value for the Status, + // and if the Connection is not configured for Load Balancing, then we also know the correct value, so no need to look at all of + // the values of the other nodes. + if (clientEntity.getComponent() != null && ConnectionDTO.LOAD_BALANCE_INACTIVE.equals(clientEntity.getComponent().getLoadBalanceStatus())) { + final boolean anyActive = entityMap.values().stream() + .map(ConnectionEntity::getComponent) + .filter(Objects::nonNull) + .map(ConnectionDTO::getLoadBalanceStatus) + .anyMatch(status -> status.equals(ConnectionDTO.LOAD_BALANCE_ACTIVE)); + + if (anyActive) { + clientEntity.getComponent().setLoadBalanceStatus(ConnectionDTO.LOAD_BALANCE_ACTIVE); + } + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java index 1f163f4..826ecf7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorDiagnosticsEntityMerger.java @@ -17,22 +17,27 @@ package org.apache.nifi.cluster.manager; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO; import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; +import org.apache.nifi.web.api.dto.diagnostics.LocalQueuePartitionDTO; import org.apache.nifi.web.api.dto.diagnostics.NodeJVMDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; +import org.apache.nifi.web.api.dto.diagnostics.RemoteQueuePartitionDTO; import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<ProcessorDiagnosticsEntity> { private final long componentStatusSnapshotMillis; @@ -46,6 +51,11 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P final List<NodeJVMDiagnosticsSnapshotDTO> nodeJvmDiagnosticsSnapshots = new ArrayList<>(entityMap.size()); + // Merge connection diagnostics + mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getIncomingConnections()); + mergeConnectionDiagnostics(clientEntity, entityMap, entity -> entity.getComponent().getOutgoingConnections()); + + // Merge the Processor Statuses and create a separate NodeJVMDiagnosticsSnapshotDTO for each. We do both of these // together simply because we are already iterating over the entityMap and we have to create the Node-specific JVM diagnostics // before we start merging the values, in the second iteration over the map. @@ -99,7 +109,7 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P // Merge permissions on referenced controller services final Map<String, ControllerServiceEntity> serviceEntityById = clientDto.getReferencedControllerServices().stream() - .map(diagnosticsDto -> diagnosticsDto.getControllerService()) + .map(ControllerServiceDiagnosticsDTO::getControllerService) .collect(Collectors.toMap(ControllerServiceEntity::getId, Function.identity())); for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) { @@ -114,6 +124,129 @@ public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<P } } } + } + + private void mergeConnectionDiagnostics(final ProcessorDiagnosticsEntity clientEntity, final Map<NodeIdentifier, ProcessorDiagnosticsEntity> entityMap, + final Function<ProcessorDiagnosticsEntity, Set<ConnectionDiagnosticsDTO>> extractConnections) { + + final Map<String, List<ConnectionDiagnosticsSnapshotDTO>> snapshotByConnectionId = new HashMap<>(); + final Map<String, ConnectionDiagnosticsDTO> connectionById = new HashMap<>(); + + for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final ProcessorDiagnosticsEntity entity = entry.getValue(); + + final Set<ConnectionDiagnosticsDTO> connections = extractConnections.apply(entity); + for (final ConnectionDiagnosticsDTO connectionDiagnostics : connections) { + final String connectionId = connectionDiagnostics.getConnection().getId(); + final ConnectionDiagnosticsSnapshotDTO snapshot = connectionDiagnostics.getAggregateSnapshot(); + + snapshot.setNodeIdentifier(nodeId.getApiAddress() + ":" + nodeId.getApiPort()); + + final List<ConnectionDiagnosticsSnapshotDTO> snapshots = snapshotByConnectionId.computeIfAbsent(connectionId, id -> new ArrayList<>()); + snapshots.add(snapshot); + + if (entity == clientEntity){ + connectionById.put(connectionId, connectionDiagnostics); + } + } + } + + for (final Map.Entry<String, List<ConnectionDiagnosticsSnapshotDTO>> entry : snapshotByConnectionId.entrySet()) { + final String connectionId = entry.getKey(); + final List<ConnectionDiagnosticsSnapshotDTO> snapshots = entry.getValue(); + + final ConnectionDiagnosticsDTO dto = connectionById.get(connectionId); + dto.setNodeSnapshots(snapshots); + + dto.setAggregateSnapshot(mergeConnectionSnapshots(snapshots)); + } + } + + + + private ConnectionDiagnosticsSnapshotDTO mergeConnectionSnapshots(final List<ConnectionDiagnosticsSnapshotDTO> snapshots) { + final ConnectionDiagnosticsSnapshotDTO aggregate = new ConnectionDiagnosticsSnapshotDTO(); + + final Map<String, List<RemoteQueuePartitionDTO>> remotePartitionsByNodeId = new HashMap<>(); + + final LocalQueuePartitionDTO localPartition = new LocalQueuePartitionDTO(); + localPartition.setActiveQueueByteCount(0); + localPartition.setActiveQueueFlowFileCount(0); + localPartition.setAllActiveQueueFlowFilesPenalized(true); // set to true because we will update this value by AND'ing it with the snapshot value + localPartition.setAnyActiveQueueFlowFilesPenalized(false); // set to false because we will update this value by OR'ing it with the snapshot value + localPartition.setInFlightByteCount(0); + localPartition.setInFlightFlowFileCount(0); + localPartition.setSwapByteCount(0); + localPartition.setSwapFiles(0); + localPartition.setSwapFlowFileCount(0); + localPartition.setTotalByteCount(0); + localPartition.setTotalFlowFileCount(0); + + aggregate.setTotalByteCount(0L); + aggregate.setTotalFlowFileCount(0); + aggregate.setLocalQueuePartition(localPartition); + + for (final ConnectionDiagnosticsSnapshotDTO snapshot : snapshots) { + aggregate.setTotalByteCount(aggregate.getTotalByteCount() + snapshot.getTotalByteCount()); + aggregate.setTotalFlowFileCount(aggregate.getTotalFlowFileCount() + snapshot.getTotalFlowFileCount()); + + final LocalQueuePartitionDTO snapshotLocalPartition = snapshot.getLocalQueuePartition(); + localPartition.setActiveQueueByteCount(localPartition.getActiveQueueByteCount() + snapshotLocalPartition.getActiveQueueByteCount()); + localPartition.setActiveQueueFlowFileCount(localPartition.getActiveQueueFlowFileCount() + snapshotLocalPartition.getActiveQueueFlowFileCount()); + localPartition.setAllActiveQueueFlowFilesPenalized(localPartition.getAllActiveQueueFlowFilesPenalized() && snapshotLocalPartition.getAllActiveQueueFlowFilesPenalized()); + localPartition.setAnyActiveQueueFlowFilesPenalized(localPartition.getAnyActiveQueueFlowFilesPenalized() || snapshotLocalPartition.getAnyActiveQueueFlowFilesPenalized()); + localPartition.setInFlightByteCount(localPartition.getInFlightByteCount() + snapshotLocalPartition.getInFlightByteCount()); + localPartition.setInFlightFlowFileCount(localPartition.getInFlightFlowFileCount() + snapshotLocalPartition.getInFlightFlowFileCount()); + localPartition.setSwapByteCount(localPartition.getSwapByteCount() + snapshotLocalPartition.getSwapByteCount()); + localPartition.setSwapFiles(localPartition.getSwapFiles() + snapshotLocalPartition.getSwapFiles()); + localPartition.setSwapFlowFileCount(localPartition.getSwapFlowFileCount() + snapshotLocalPartition.getSwapFlowFileCount()); + localPartition.setTotalByteCount(localPartition.getTotalByteCount() + snapshotLocalPartition.getTotalByteCount()); + localPartition.setTotalFlowFileCount(localPartition.getTotalFlowFileCount() + snapshotLocalPartition.getTotalFlowFileCount()); + + for (final RemoteQueuePartitionDTO remoteQueuePartition : snapshot.getRemoteQueuePartitions()) { + final String nodeId = remoteQueuePartition.getNodeIdentifier(); + final List<RemoteQueuePartitionDTO> partitionsForNodeId = remotePartitionsByNodeId.computeIfAbsent(nodeId, key -> new ArrayList<>()); + partitionsForNodeId.add(remoteQueuePartition); + } + } + + final List<RemoteQueuePartitionDTO> mergedRemoteQueuePartitions = new ArrayList<>(); + for (final List<RemoteQueuePartitionDTO> partitions : remotePartitionsByNodeId.values()) { + final RemoteQueuePartitionDTO merged = mergeRemoteQueuePartitions(partitions); + mergedRemoteQueuePartitions.add(merged); + } + + aggregate.setRemoteQueuePartitions(mergedRemoteQueuePartitions); + + return aggregate; + } + + private RemoteQueuePartitionDTO mergeRemoteQueuePartitions(final List<RemoteQueuePartitionDTO> partitions) { + final RemoteQueuePartitionDTO merged = new RemoteQueuePartitionDTO(); + merged.setActiveQueueByteCount(0); + merged.setActiveQueueFlowFileCount(0); + merged.setInFlightByteCount(0); + merged.setInFlightFlowFileCount(0); + merged.setSwapByteCount(0); + merged.setSwapFiles(0); + merged.setSwapFlowFileCount(0); + merged.setTotalByteCount(0); + merged.setTotalFlowFileCount(0); + + for (final RemoteQueuePartitionDTO partition : partitions) { + merged.setActiveQueueByteCount(merged.getActiveQueueByteCount() + partition.getActiveQueueByteCount()); + merged.setActiveQueueFlowFileCount(merged.getActiveQueueFlowFileCount() + partition.getActiveQueueFlowFileCount()); + merged.setInFlightByteCount(merged.getInFlightByteCount() + partition.getInFlightByteCount()); + merged.setInFlightFlowFileCount(merged.getInFlightFlowFileCount() + partition.getInFlightFlowFileCount()); + merged.setSwapByteCount(merged.getSwapByteCount() + partition.getSwapByteCount()); + merged.setSwapFiles(merged.getSwapFiles() + partition.getSwapFiles()); + merged.setSwapFlowFileCount(merged.getSwapFlowFileCount() + partition.getSwapFlowFileCount()); + merged.setTotalByteCount(merged.getTotalByteCount() + partition.getTotalByteCount()); + merged.setTotalFlowFileCount(merged.getTotalFlowFileCount() + partition.getTotalFlowFileCount()); + merged.setNodeIdentifier(partition.getNodeIdentifier()); + } + return merged; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index 1915b9b..9c833b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -218,7 +218,7 @@ public class TestPopularVoteFlowElection { } private NodeIdentifier createNodeId(final int index) { - return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true); + return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true); } private DataFlow createDataFlow(final byte[] flow) { http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 50bdd0d..6ea019d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.heartbeat; import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -55,7 +56,7 @@ public class TestAbstractHeartbeatMonitor { @Before public void setup() throws Exception { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); - nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, null, false); + nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", 777, "localhost", null, null, false); } @After @@ -136,7 +137,7 @@ public class TestAbstractHeartbeatMonitor { @Test public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws Exception { final NodeIdentifier nodeId1 = nodeId; - final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", null, null, false); + final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", 5555, "localhost", null, null, false); final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter(); final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); @@ -272,7 +273,7 @@ public class TestAbstractHeartbeatMonitor { } @Override - public synchronized boolean isBlockedByFirewall(String hostname) { + public synchronized boolean isBlockedByFirewall(Set<String> nodeIds) { return false; } @@ -369,6 +370,14 @@ public class TestAbstractHeartbeatMonitor { public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException { return null; } + + @Override + public void registerEventListener(final ClusterTopologyEventListener eventListener) { + } + + @Override + public void unregisterEventListener(final ClusterTopologyEventListener eventListener) { + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy index f2d3a24..75b7ae3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy @@ -64,7 +64,8 @@ class StandardHttpResponseMapperSpec extends Specification { int n = it.node def response = Mock(Response) mockToRequestEntity.put response, it - new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId) + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, + requestId) } as Set when: @@ -102,7 +103,8 @@ class StandardHttpResponseMapperSpec extends Specification { ++n def response = Mock(Response) mockToRequestEntity.put response, it - new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId) + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'sktaddr', n * 11, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, + requestId) } as Set when: http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java index c1cfdf8..a93cd68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/CurrentUserEndpointMergerTest.java @@ -39,7 +39,7 @@ public class CurrentUserEndpointMergerTest { @Test public void testMergeUserPermissions() { - final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9002, 9003, false); + final NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 9000, "localhost", 9001, "localhost", 9006, "localhost", 9002, 9003, false); final CurrentUserEntity userNode1 = new CurrentUserEntity(); userNode1.setControllerPermissions(buildPermissions(true, false)); userNode1.setCountersPermissions(buildPermissions(true, true)); @@ -55,7 +55,7 @@ public class CurrentUserEndpointMergerTest { componentRestrictionsNode1.add(buildComponentRestriction(RequiredPermission.READ_FILESYSTEM, true, true)); userNode1.setComponentRestrictionPermissions(componentRestrictionsNode1); - final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); + final NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 8000, "localhost", 8001, "localhost", 9006,"localhost", 8002, 8003, false); final CurrentUserEntity userNode2 = new CurrentUserEntity(); userNode2.setControllerPermissions(buildPermissions(false, true)); userNode2.setCountersPermissions(buildPermissions(true, false)); http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy index 232c562..104e69b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMergerSpec.groovy @@ -57,7 +57,8 @@ class StatusHistoryEndpointMergerSpec extends Specification { ++n def response = Mock(Response) mockToRequestEntity.put response, it - new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, response, 500L, requestId) + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, null, n * 10, 'stsaddr', n * 100, n * 1000, false, null), + "GET", requestUri, response, 500L, requestId) } as Set when:
