http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterNodes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterNodes.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterNodes.java new file mode 100644 index 0000000..ad31e68 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterNodes.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol; + +import java.util.Collection; +import java.util.Collections; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Represents a set of Node Identifiers that identify all nodes that are in a NiFi Cluster + */ +@XmlRootElement(name = "ClusterNodes") +public class ClusterNodes { + private Collection<NodeIdentifier> nodeIds; + + public ClusterNodes(final Collection<NodeIdentifier> nodeIds) { + this.nodeIds = nodeIds; + } + + public Collection<NodeIdentifier> getNodeIdentifiers() { + return Collections.unmodifiableCollection(nodeIds); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java index 4e06926..96bde72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java @@ -31,7 +31,7 @@ import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter; @XmlJavaTypeAdapter(ConnectionResponseAdapter.class) public class ConnectionResponse { - private final boolean blockedByFirewall; + private final String rejectionReason; private final int tryLaterSeconds; private final NodeIdentifier nodeIdentifier; private final StandardDataFlow dataFlow; @@ -43,7 +43,7 @@ public class ConnectionResponse { private volatile String clusterManagerDN; public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, - final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { + final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { if (nodeIdentifier == null) { throw new IllegalArgumentException("Node identifier may not be empty or null."); } else if (dataFlow == null) { @@ -52,7 +52,7 @@ public class ConnectionResponse { this.nodeIdentifier = nodeIdentifier; this.dataFlow = dataFlow; this.tryLaterSeconds = 0; - this.blockedByFirewall = false; + this.rejectionReason = null; this.primary = primary; this.managerRemoteInputPort = managerRemoteInputPort; this.managerRemoteCommsSecure = managerRemoteCommsSecure; @@ -66,18 +66,18 @@ public class ConnectionResponse { this.dataFlow = null; this.nodeIdentifier = null; this.tryLaterSeconds = tryLaterSeconds; - this.blockedByFirewall = false; + this.rejectionReason = null; this.primary = false; this.managerRemoteInputPort = null; this.managerRemoteCommsSecure = null; this.instanceId = null; } - private ConnectionResponse() { + private ConnectionResponse(final String rejectionReason) { this.dataFlow = null; this.nodeIdentifier = null; this.tryLaterSeconds = 0; - this.blockedByFirewall = true; + this.rejectionReason = rejectionReason; this.primary = false; this.managerRemoteInputPort = null; this.managerRemoteCommsSecure = null; @@ -85,7 +85,15 @@ public class ConnectionResponse { } public static ConnectionResponse createBlockedByFirewallResponse() { - return new ConnectionResponse(); + return new ConnectionResponse("Blocked by Firewall"); + } + + public static ConnectionResponse createConflictingNodeIdResponse(final String otherNode) { + return new ConnectionResponse("The Node Identifier provided already belongs to node " + otherNode); + } + + public static ConnectionResponse createRejectionResponse(final String explanation) { + return new ConnectionResponse(explanation); } public boolean isPrimary() { @@ -96,8 +104,8 @@ public class ConnectionResponse { return tryLaterSeconds > 0; } - public boolean isBlockedByFirewall() { - return blockedByFirewall; + public String getRejectionReason() { + return rejectionReason; } public int getTryLaterSeconds() { @@ -135,5 +143,4 @@ public class ConnectionResponse { public String getClusterManagerDN() { return clusterManagerDN; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java index eff62b9..f48012a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java @@ -61,13 +61,30 @@ public class NodeIdentifier { */ private final int socketPort; + /** + * the IP or hostname that external clients should use to communicate with this node via Site-to-Site + */ + private final String siteToSiteAddress; + + /** + * the port that external clients should use to communicate with this node via Site-to-Site + */ + private final Integer siteToSitePort; + + /** + * whether or not site-to-site communications with this node are secure + */ + private Boolean siteToSiteSecure; + private final String nodeDn; - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) { - this(id, apiAddress, apiPort, socketAddress, socketPort, null); + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, + final String siteToSiteAddress, final Integer siteToSitePort, final boolean siteToSiteSecure) { + this(id, apiAddress, apiPort, socketAddress, socketPort, siteToSiteAddress, siteToSitePort, siteToSiteSecure, null); } - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) { + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, + final String siteToSiteAddress, final Integer siteToSitePort, final boolean siteToSiteSecure, final String dn) { if (StringUtils.isBlank(id)) { throw new IllegalArgumentException("Node ID may not be empty or null."); @@ -79,6 +96,9 @@ public class NodeIdentifier { validatePort(apiPort); validatePort(socketPort); + if (siteToSitePort != null) { + validatePort(siteToSitePort); + } this.id = id; this.apiAddress = apiAddress; @@ -86,6 +106,9 @@ public class NodeIdentifier { this.socketAddress = socketAddress; this.socketPort = socketPort; this.nodeDn = dn; + this.siteToSiteAddress = siteToSiteAddress; + this.siteToSitePort = siteToSitePort; + this.siteToSiteSecure = siteToSiteSecure; } public String getId() { @@ -118,6 +141,19 @@ public class NodeIdentifier { } } + public String getSiteToSiteAddress() { + return siteToSiteAddress; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public boolean isSiteToSiteSecure() { + return siteToSiteSecure; + } + + /** * Compares the id of two node identifiers for equality. * @@ -165,6 +201,7 @@ public class NodeIdentifier { if (this.socketPort != other.socketPort) { return false; } + return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java index 6046702..4243b41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java @@ -27,7 +27,7 @@ public class AdaptedConnectionResponse { private StandardDataFlow dataFlow; private NodeIdentifier nodeIdentifier; - private boolean blockedByFirewall; + private String rejectionReason; private boolean primary; private int tryLaterSeconds; private Integer managerRemoteInputPort; @@ -63,12 +63,12 @@ public class AdaptedConnectionResponse { this.tryLaterSeconds = tryLaterSeconds; } - public boolean isBlockedByFirewall() { - return blockedByFirewall; + public String getRejectionReason() { + return rejectionReason; } - public void setBlockedByFirewall(boolean blockedByFirewall) { - this.blockedByFirewall = blockedByFirewall; + public void setRejectionReason(final String rejectionReason) { + this.rejectionReason = rejectionReason; } public boolean isPrimary() { http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java index 3bbf7b6..beca014 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java @@ -21,14 +21,13 @@ package org.apache.nifi.cluster.protocol.jaxb.message; public class AdaptedNodeIdentifier { private String id; - private String apiAddress; - private int apiPort; - private String socketAddress; - private int socketPort; + private String siteToSiteAddress; + private Integer siteToSitePort; + private boolean siteToSiteSecure; public AdaptedNodeIdentifier() { } @@ -73,4 +72,28 @@ public class AdaptedNodeIdentifier { this.socketPort = socketPort; } + public String getSiteToSiteAddress() { + return siteToSiteAddress; + } + + public void setSiteToSiteAddress(String siteToSiteAddress) { + this.siteToSiteAddress = siteToSiteAddress; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(Integer siteToSitePort) { + this.siteToSitePort = siteToSitePort; + } + + + public boolean isSiteToSiteSecure() { + return siteToSiteSecure; + } + + public void setSiteToSiteSecure(boolean siteToSiteSecure) { + this.siteToSiteSecure = siteToSiteSecure; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java index baabc33..b2c1c67 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java @@ -30,7 +30,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo aCr.setDataFlow(cr.getDataFlow()); aCr.setNodeIdentifier(cr.getNodeIdentifier()); aCr.setTryLaterSeconds(cr.getTryLaterSeconds()); - aCr.setBlockedByFirewall(cr.isBlockedByFirewall()); + aCr.setRejectionReason(cr.getRejectionReason()); aCr.setPrimary(cr.isPrimary()); aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort()); aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure()); @@ -43,11 +43,11 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) { if (aCr.shouldTryLater()) { return new ConnectionResponse(aCr.getTryLaterSeconds()); - } else if (aCr.isBlockedByFirewall()) { - return ConnectionResponse.createBlockedByFirewallResponse(); + } else if (aCr.getRejectionReason() != null) { + return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason()); } else { return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(), - aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId()); + aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java index da54658..b040cd4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java @@ -34,6 +34,9 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod aNi.setApiPort(ni.getApiPort()); aNi.setSocketAddress(ni.getSocketAddress()); aNi.setSocketPort(ni.getSocketPort()); + aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress()); + aNi.setSiteToSitePort(ni.getSiteToSitePort()); + aNi.setSiteToSiteSecure(ni.isSiteToSiteSecure()); return aNi; } } @@ -43,7 +46,8 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod if (aNi == null) { return null; } else { - return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort()); + return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), + aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(), aNi.isSiteToSiteSecure()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java index 5bfb5ed..82d0959 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java @@ -87,7 +87,7 @@ public class ClusterManagerProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port, "localhost", 3821, false)); FlowResponseMessage response = sender.requestFlow(request); assertNotNull(response); } @@ -98,7 +98,7 @@ public class ClusterManagerProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port, "localhost", 3821, false)); try { sender.requestFlow(request); fail("failed to throw exception"); @@ -122,7 +122,7 @@ public class ClusterManagerProtocolSenderImplTest { } }); FlowRequestMessage request = new FlowRequestMessage(); - request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port)); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port, "localhost", 3821, false)); try { sender.requestFlow(request); fail("failed to throw exception"); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java index 19834ae..94c0a20 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java @@ -47,7 +47,6 @@ import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.multicast.DiscoverableService; import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; - import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -80,7 +79,7 @@ public class NodeProtocolSenderImplTest { mockServiceLocator = mock(ClusterServiceLocator.class); mockHandler = mock(ProtocolHandler.class); - nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); + nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678, "localhost", 3821, false); ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); @@ -110,7 +109,7 @@ public class NodeProtocolSenderImplTest { when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, - new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); + new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); ConnectionRequestMessage request = new ConnectionRequestMessage(); @@ -178,7 +177,7 @@ public class NodeProtocolSenderImplTest { when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); HeartbeatMessage hb = new HeartbeatMessage(); - hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[]{1, 2, 3})); + hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4, "localhost", 3821, false), false, false, new byte[] {1, 2, 3})); sender.heartbeat(hb); } @@ -190,7 +189,7 @@ public class NodeProtocolSenderImplTest { when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); - msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); + msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1, "localhost", 3821, false)); msg.setExceptionMessage("some exception"); sender.notifyControllerStartupFailure(msg); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java index 510579f..9c61f95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/Event.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.event; import java.util.Date; + import org.apache.commons.lang3.StringUtils; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java index c9cd6a5..d949100 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; + import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.EventManager; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java index 8ba33c5..d8e9b07 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java @@ -24,10 +24,11 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; + import org.apache.commons.net.util.SubnetUtils; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; -import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java index 5a15b4b..0f9872f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java @@ -16,19 +16,19 @@ */ package org.apache.nifi.cluster.manager; -import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; -import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException; import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException; +import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import java.net.URI; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Extends the ClusterManager interface to define how requests issued to the cluster manager are federated to the nodes. Specifically, the HTTP protocol is used for communicating requests to the * cluster manager and to the nodes. http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java index 470c6ba..3a3d976 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java @@ -16,11 +16,12 @@ */ package org.apache.nifi.cluster.manager; -import org.apache.nifi.cluster.manager.exception.UriConstructionException; import java.net.URI; import java.util.List; import java.util.Map; import java.util.Set; + +import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java index fda9ecf..8f56bbb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.manager; import java.net.URI; import java.util.Map; import java.util.Set; + import org.apache.nifi.cluster.node.Node.Status; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java index 2789f78..ec365aa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.cluster.manager; -import com.sun.jersey.api.client.ClientResponse; import java.io.BufferedInputStream; import java.io.IOException; import java.io.OutputStream; @@ -31,14 +30,16 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.web.api.entity.Entity; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.entity.Entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.ClientResponse; + /** * Encapsulates a node's response in regards to receiving a external API request. * http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConflictingNodeIdException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConflictingNodeIdException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConflictingNodeIdException.java new file mode 100644 index 0000000..7c8c45d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConflictingNodeIdException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.manager.exception; + +public class ConflictingNodeIdException extends Exception { + private static final long serialVersionUID = 1L; + + private final String nodeId; + private final String conflictingNodeAddress; + private final int conflictingNodePort; + + public ConflictingNodeIdException(final String nodeId, final String conflictingNodeAddress, final int conflictingNodePort) { + super("Node Identifier " + nodeId + " conflicts with existing node " + conflictingNodeAddress + ":" + conflictingNodePort); + + this.nodeId = nodeId; + this.conflictingNodeAddress = conflictingNodeAddress; + this.conflictingNodePort = conflictingNodePort; + } + + public String getNodeId() { + return nodeId; + } + + public String getConflictingNodeAddress() { + return conflictingNodeAddress; + } + + public int getConflictingNodePort() { + return conflictingNodePort; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java index 4c48d77..d3d5559 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.cluster.manager.impl; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -30,10 +34,6 @@ import org.apache.nifi.provenance.search.QuerySubmission; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.reporting.EventAccess; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - public class ClusteredEventAccess implements EventAccess { private final WebClusterManager clusterManager; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java index 4a85b5b..7f176b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java @@ -25,6 +25,7 @@ import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -47,13 +48,15 @@ public class ClusteredReportingContext implements ReportingContext { private final ControllerServiceProvider serviceProvider; private final Map<PropertyDescriptor, String> properties; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; + private final StateManager stateManager; - public ClusteredReportingContext(final EventAccess eventAccess, final BulletinRepository bulletinRepository, - final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider) { + public ClusteredReportingContext(final EventAccess eventAccess, final BulletinRepository bulletinRepository, final Map<PropertyDescriptor, String> properties, + final ControllerServiceProvider serviceProvider, final StateManager stateManager) { this.eventAccess = eventAccess; this.bulletinRepository = bulletinRepository; this.properties = Collections.unmodifiableMap(properties); this.serviceProvider = serviceProvider; + this.stateManager = stateManager; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { @@ -206,4 +209,9 @@ public class ClusteredReportingContext implements ReportingContext { return null; } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java index 32181e3..f86c290 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.cluster.manager.impl; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.UniformInterfaceException; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; -import com.sun.jersey.core.util.MultivaluedMapImpl; - import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -47,17 +39,24 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.manager.HttpRequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.util.FormatUtils; - -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; +import com.sun.jersey.core.util.MultivaluedMapImpl; + /** * An implementation of the <code>HttpRequestReplicator</code> interface. This implementation parallelizes the node HTTP requests using the given <code>ExecutorService</code> instance. Individual * requests may have connection and read timeouts set, which may be set during instance construction. Otherwise, the default is not to timeout. http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java index c11df05..098573d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; + import org.apache.nifi.cluster.manager.HttpResponseMapper; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.node.Node; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 55e58ac..95cac50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -87,6 +87,7 @@ import org.apache.nifi.cluster.manager.HttpClusterManager; import org.apache.nifi.cluster.manager.HttpRequestReplicator; import org.apache.nifi.cluster.manager.HttpResponseMapper; import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException; import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; @@ -126,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowFileSummaries; import org.apache.nifi.controller.Heartbeater; @@ -151,6 +153,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.controller.service.StandardControllerServiceProvider; +import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; import org.apache.nifi.controller.status.history.ComponentStatusRepository; @@ -371,8 +374,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C private final FlowEngine reportingTaskEngine; private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>(); private final StandardProcessScheduler processScheduler; + private final StateManagerProvider stateManagerProvider; private final long componentStatusSnapshotMillis; + public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper, final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener, final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) { @@ -468,11 +473,17 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread"); + try { + this.stateManagerProvider = StandardStateManagerProvider.create(properties); + } catch (final IOException e) { + throw new RuntimeException(e); + } + processScheduler = new StandardProcessScheduler(new Heartbeater() { @Override public void heartbeat() { } - }, this, encryptor); + }, this, encryptor, stateManagerProvider); // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. @@ -481,13 +492,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository); + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); } public void start() throws IOException { writeLock.lock(); try { - if (isRunning()) { throw new IllegalStateException("Instance is already started."); } @@ -712,7 +722,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C try { // resolve the proposed node identifier to a valid node identifier - final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier()); + final NodeIdentifier resolvedNodeIdentifier; + try { + resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier()); + } catch (final ConflictingNodeIdException e) { + logger.info("Rejecting node {} from connecting to cluster because it provided a Node ID of {} but that Node ID already belongs to {}:{}", + request.getProposedNodeIdentifier().getSocketAddress(), request.getProposedNodeIdentifier().getId(), e.getConflictingNodeAddress(), e.getConflictingNodePort()); + return ConnectionResponse.createConflictingNodeIdResponse(e.getConflictingNodeAddress() + ":" + e.getConflictingNodePort()); + } if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) { // if the socket address is not listed in the firewall, then return a null response @@ -1029,7 +1046,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) { if (entry.getValue() != null) { - reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue()); + reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue(), false); } } @@ -1096,7 +1113,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler, - new ClusteredEventAccess(this, auditService), bulletinRepository, controllerServiceProvider, validationContextFactory); + new ClusteredEventAccess(this, auditService), bulletinRepository, controllerServiceProvider, + validationContextFactory, stateManagerProvider.getStateManager(id)); taskNode.setName(task.getClass().getSimpleName()); reportingTasks.put(id, taskNode); @@ -1354,8 +1372,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { - return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), - nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), dn); + return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), + nodeId.getSocketAddress(), nodeId.getSocketPort(), + nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), nodeId.isSiteToSiteSecure(), dn); } private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) { @@ -1848,6 +1867,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } } + private ComponentStatusRepository createComponentStatusRepository() { final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -3644,7 +3664,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C * * @return the node identifier that should be used */ - private NodeIdentifier resolveProposedNodeIdentifier(final NodeIdentifier proposedNodeId) { + private NodeIdentifier resolveProposedNodeIdentifier(final NodeIdentifier proposedNodeId) throws ConflictingNodeIdException { readLock.lock(); try { for (final Node node : nodes) { @@ -3660,32 +3680,32 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C // we know about this node and it has the same ID, so the proposal is fine return proposedNodeId; } else if (sameId && !sameServiceCoordinates) { - // proposed ID conflicts with existing node ID, so assign a new ID - final NodeIdentifier resolvedIdentifier = new NodeIdentifier( - UUID.randomUUID().toString(), - proposedNodeId.getApiAddress(), - proposedNodeId.getApiPort(), - proposedNodeId.getSocketAddress(), - proposedNodeId.getSocketPort()); - logger.info(String.format("Using Node Identifier %s because proposed node identifier %s conflicts existing node identifiers", - resolvedIdentifier, proposedNodeId)); - return resolvedIdentifier; + throw new ConflictingNodeIdException(nodeId.getId(), node.getNodeId().getApiAddress(), node.getNodeId().getApiPort()); } else if (!sameId && sameServiceCoordinates) { // we know about this node, so we'll use the existing ID - logger.debug(String.format("Using Node Identifier %s because proposed node identifier %s matches the service coordinates", - nodeId, proposedNodeId)); - return nodeId; + logger.debug(String.format("Using Node Identifier %s because proposed node identifier %s matches the service coordinates", nodeId, proposedNodeId)); + + // return a new Node Identifier that uses the existing Node UUID, Node Index, and ZooKeeper Port from the existing Node (because these are the + // elements that are assigned by the NCM), but use the other parameters from the proposed identifier, since these elements are determined by + // the node rather than the NCM. + return new NodeIdentifier(nodeId.getId(), + proposedNodeId.getApiAddress(), proposedNodeId.getApiPort(), + proposedNodeId.getSocketAddress(), proposedNodeId.getSocketPort(), + proposedNodeId.getSiteToSiteAddress(), proposedNodeId.getSiteToSitePort(), proposedNodeId.isSiteToSiteSecure()); } } - // proposal does not conflict with existing nodes - return proposedNodeId; + // proposal does not conflict with existing nodes - this is a new node. Assign a new Node Index to it + return new NodeIdentifier(proposedNodeId.getId(), proposedNodeId.getApiAddress(), proposedNodeId.getApiPort(), + proposedNodeId.getSocketAddress(), proposedNodeId.getSocketPort(), + proposedNodeId.getSiteToSiteAddress(), proposedNodeId.getSiteToSitePort(), proposedNodeId.isSiteToSiteSecure()); } finally { readLock.unlock("resolveProposedNodeIdentifier"); } } + private boolean isHeartbeatMonitorRunning() { readLock.lock(); try { @@ -3879,13 +3899,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C continue; } - final Integer siteToSitePort = heartbeat.getSiteToSitePort(); + final Integer siteToSitePort = id.getSiteToSitePort(); if (siteToSitePort == null) { continue; } final int flowFileCount = (int) heartbeat.getTotalFlowFileCount(); - final NodeInformation nodeInfo = new NodeInformation(id.getApiAddress(), siteToSitePort, id.getApiPort(), - heartbeat.isSiteToSiteSecure(), flowFileCount); + final NodeInformation nodeInfo = new NodeInformation(id.getSiteToSiteAddress(), siteToSitePort, id.getApiPort(), + id.isSiteToSiteSecure(), flowFileCount); nodeInfos.add(nodeInfo); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java index 95da615..3bb3c1a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java @@ -22,7 +22,6 @@ import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.Heartbeat; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java index 2136dad..c51e061 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java @@ -25,7 +25,6 @@ import org.apache.nifi.io.socket.multicast.DiscoverableService; import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; - import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.FactoryBean; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java index ef72298..9d064eb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.spring; import java.io.File; + import org.apache.nifi.cluster.firewall.impl.FileBasedClusterNodeFirewall; import org.apache.nifi.util.NiFiProperties; import org.springframework.beans.factory.FactoryBean; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java index 1ed5b30..a23cfdd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/controller/reporting/ClusteredReportingTaskNode.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.reporting; import org.apache.nifi.cluster.manager.impl.ClusteredReportingContext; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -30,20 +31,22 @@ public class ClusteredReportingTaskNode extends AbstractReportingTaskNode { private final EventAccess eventAccess; private final BulletinRepository bulletinRepository; private final ControllerServiceProvider serviceProvider; + private final StateManager stateManager; public ClusteredReportingTaskNode(final ReportingTask reportingTask, final String id, final ProcessScheduler scheduler, final EventAccess eventAccess, final BulletinRepository bulletinRepository, final ControllerServiceProvider serviceProvider, - final ValidationContextFactory validationContextFactory) { + final ValidationContextFactory validationContextFactory, final StateManager stateManager) { super(reportingTask, id, serviceProvider, scheduler, validationContextFactory); this.eventAccess = eventAccess; this.bulletinRepository = bulletinRepository; this.serviceProvider = serviceProvider; + this.stateManager = stateManager; } @Override public ReportingContext getReportingContext() { - return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider); + return new ClusteredReportingContext(eventAccess, bulletinRepository, getProperties(), serviceProvider, stateManager); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java index e823d27..6487a20 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/event/impl/EventManagerImplTest.java @@ -16,15 +16,17 @@ */ package org.apache.nifi.cluster.event.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.util.Arrays; import java.util.Collections; import java.util.List; + import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.Event.Category; import org.apache.nifi.cluster.event.EventManager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java index b5f76fb..55c8768 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewallTest.java @@ -16,13 +16,15 @@ */ package org.apache.nifi.cluster.firewall.impl; -import java.io.File; -import java.net.InetAddress; -import java.net.UnknownHostException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; + import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java index b99cbea..e526ea3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java @@ -30,6 +30,7 @@ import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; +import org.apache.commons.io.FileUtils; import org.apache.nifi.cluster.flow.DataFlowDao; import org.apache.nifi.cluster.flow.PersistedFlowState; import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; @@ -46,8 +47,6 @@ import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.io.socket.ServerSocketConfiguration; import org.apache.nifi.io.socket.SocketConfiguration; - -import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -148,7 +147,7 @@ public class DataFlowManagementServiceImplTest { byte[] flowBytes = flowStr.getBytes(); listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); service.setPersistedFlowState(PersistedFlowState.STALE); @@ -168,8 +167,8 @@ public class DataFlowManagementServiceImplTest { String flowStr = "<rootGroup />"; listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); service.setPersistedFlowState(PersistedFlowState.STALE); @@ -196,8 +195,8 @@ public class DataFlowManagementServiceImplTest { byte[] flowBytes = flowStr.getBytes(); listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); service.setPersistedFlowState(PersistedFlowState.STALE); @@ -217,8 +216,8 @@ public class DataFlowManagementServiceImplTest { byte[] flowBytes = flowStr.getBytes(); listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); for (int i = 0; i < 1000; i++) { service.setNodeIds(new HashSet<>(Arrays.asList(nodeId1, nodeId2))); @@ -239,8 +238,8 @@ public class DataFlowManagementServiceImplTest { String flowStr = "<rootGroup />"; listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1); - NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false); + NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setRetrievalDelay("5 sec"); for (int i = 0; i < 1000; i++) { @@ -263,9 +262,9 @@ public class DataFlowManagementServiceImplTest { listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); Set<NodeIdentifier> nodeIds = new HashSet<>(); for (int i = 0; i < 1000; i++) { - nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1)); + nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false)); } - nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort)); + nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false)); long lastRetrievalTime = service.getLastRetrievalTime(); @@ -291,7 +290,7 @@ public class DataFlowManagementServiceImplTest { String flowStr = "<rootGroup />"; byte[] flowBytes = flowStr.getBytes(); listener.addHandler(new FlowRequestProtocolHandler(new StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0]))); - NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort); + NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", apiDummyPort, "localhost", socketPort, "localhost", 1234, false); service.setNodeIds(new HashSet<>(Arrays.asList(nodeId))); service.setPersistedFlowState(PersistedFlowState.STALE); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java index b02eac0..e3e9ac6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImplTest.java @@ -16,35 +16,39 @@ */ package org.apache.nifi.cluster.manager.impl; -import javax.ws.rs.core.Response; -import javax.xml.bind.annotation.XmlRootElement; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MultivaluedMap; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.Iterator; -import javax.ws.rs.core.StreamingOutput; -import org.apache.nifi.cluster.manager.testutils.HttpResponse; -import org.apache.nifi.cluster.manager.testutils.HttpServer; -import com.sun.jersey.api.client.Client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import javax.xml.bind.annotation.XmlRootElement; + import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.testutils.HttpResponse; +import org.apache.nifi.cluster.manager.testutils.HttpResponseAction; +import org.apache.nifi.cluster.manager.testutils.HttpServer; +import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.apache.nifi.cluster.manager.testutils.HttpResponseAction; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; + +import com.sun.jersey.api.client.Client; /** */ @@ -341,7 +345,7 @@ public class HttpRequestReplicatorImplTest { private Set<NodeIdentifier> createNodes(int num, String host, int apiPort) { Set<NodeIdentifier> result = new HashSet<>(); for (int i = 0; i < num; i++) { - result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1)); + result.add(new NodeIdentifier(String.valueOf(i), host, apiPort, host, 1, "localhost", 1234, false)); } return result; } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java index ebea63a..ba6ca82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImplTest.java @@ -16,23 +16,26 @@ */ package org.apache.nifi.cluster.manager.impl; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.core.util.MultivaluedMapImpl; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.ByteArrayInputStream; -import java.util.Map; -import java.util.HashSet; -import java.util.Set; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.node.Node.Status; import org.apache.nifi.cluster.protocol.NodeIdentifier; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; /** */ @@ -119,7 +122,7 @@ public class HttpResponseMapperImplTest { when(clientResponse.getHeaders()).thenReturn(new MultivaluedMapImpl()); when(clientResponse.getEntityInputStream()).thenReturn(new ByteArrayInputStream(new byte[0])); - NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1); + NodeIdentifier nodeIdentifier = new NodeIdentifier(nodeId, "localhost", 1, "localhost", 1, "localhost", 1234, false); return new NodeResponse(nodeIdentifier, "GET", dummyUri, clientResponse, 1L, "111"); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java index 13a192f..55c6c31 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/impl/TestWebClusterManager.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.cluster.manager.impl; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; - import static org.junit.Assert.assertEquals; import java.text.DateFormat; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java index 4c3eeee..89c9a0a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java @@ -25,8 +25,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; + import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; + import org.apache.commons.lang3.StringUtils; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java index 7ae4806..26c892d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.manager.testutils; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import javax.ws.rs.core.Response.Status; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java index 3621475..e9cbd1d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; + import org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
