http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java index a4eb46e..9a53a72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java @@ -34,6 +34,7 @@ public class AdaptedConnectionResponse { private String rejectionReason; private int tryLaterSeconds; private Integer managerRemoteInputPort; + private Integer managerRemoteInputHttpPort; private Boolean managerRemoteCommsSecure; private String instanceId; private List<NodeConnectionStatus> nodeStatuses; @@ -88,6 +89,14 @@ public class AdaptedConnectionResponse { return managerRemoteInputPort; } + public void setManagerRemoteInputHttpPort(Integer managerRemoteInputHttpPort) { + this.managerRemoteInputHttpPort = managerRemoteInputHttpPort; + } + + public Integer getManagerRemoteInputHttpPort() { + return managerRemoteInputHttpPort; + } + public void setManagerRemoteCommsSecure(Boolean secure) { this.managerRemoteCommsSecure = secure; }
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java index beca014..a2d9968 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java @@ -27,6 +27,8 @@ public class AdaptedNodeIdentifier { private int socketPort; private String siteToSiteAddress; private Integer siteToSitePort; + private Integer siteToSiteHttpApiPort; + private boolean siteToSiteSecure; public AdaptedNodeIdentifier() { @@ -96,4 +98,13 @@ public class AdaptedNodeIdentifier { public void setSiteToSiteSecure(boolean siteToSiteSecure) { this.siteToSiteSecure = siteToSiteSecure; } + + public Integer getSiteToSiteHttpApiPort() { + return siteToSiteHttpApiPort; + } + + public void setSiteToSiteHttpApiPort(Integer siteToSiteHttpApiPort) { + this.siteToSiteHttpApiPort = siteToSiteHttpApiPort; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java index ca98a86..cf64e71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java @@ -32,6 +32,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo aCr.setTryLaterSeconds(cr.getTryLaterSeconds()); aCr.setRejectionReason(cr.getRejectionReason()); aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort()); + aCr.setManagerRemoteInputHttpPort(cr.getManagerRemoteInputHttpPort()); aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure()); aCr.setInstanceId(cr.getInstanceId()); aCr.setNodeConnectionStatuses(cr.getNodeConnectionStatuses()); @@ -48,7 +49,7 @@ public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionRespo return ConnectionResponse.createRejectionResponse(aCr.getRejectionReason()); } else { return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), - aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), + aCr.getManagerRemoteInputPort(), aCr.getManagerRemoteInputHttpPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId(), aCr.getNodeConnectionStatuses(), aCr.getComponentRevisions()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java index b040cd4..4a2660f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java @@ -36,6 +36,7 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod aNi.setSocketPort(ni.getSocketPort()); aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress()); aNi.setSiteToSitePort(ni.getSiteToSitePort()); + aNi.setSiteToSiteHttpApiPort(ni.getSiteToSiteHttpApiPort()); aNi.setSiteToSiteSecure(ni.isSiteToSiteSecure()); return aNi; } @@ -47,7 +48,7 @@ public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, Nod return null; } else { return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(), - aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(), aNi.isSiteToSiteSecure()); + aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(),aNi.getSiteToSiteHttpApiPort(), aNi.isSiteToSiteSecure()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java index 4208498..008b586 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java @@ -36,6 +36,7 @@ public class ReconnectionRequestMessage extends ProtocolMessage { private StandardDataFlow dataFlow; private boolean primary; private Integer managerRemoteSiteListeningPort; + private Integer managerRemoteSiteListeningHttpPort; private Boolean managerRemoteSiteCommsSecure; private String instanceId; private List<NodeConnectionStatus> nodeStatuses; @@ -82,6 +83,14 @@ public class ReconnectionRequestMessage extends ProtocolMessage { return managerRemoteSiteListeningPort; } + public void setManagerRemoteSiteListeningHttpPort(Integer managerRemoteSiteListeningHttpPort) { + this.managerRemoteSiteListeningHttpPort = managerRemoteSiteListeningHttpPort; + } + + public Integer getManagerRemoteSiteListeningHttpPort() { + return managerRemoteSiteListeningHttpPort; + } + public void setManagerRemoteSiteCommsSecure(final Boolean remoteSiteCommsSecure) { this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java index 25ab73a..8edabd2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java @@ -45,11 +45,11 @@ public class TestJaxbProtocolUtils { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ConnectionResponseMessage msg = new ConnectionResponseMessage(); - final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, true); + final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true); final DataFlow dataFlow = new StandardDataFlow(new byte[0], new byte[0]); final List<NodeConnectionStatus> nodeStatuses = Collections.singletonList(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); final List<ComponentRevision> componentRevisions = Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, "client-1", "component-1"))); - msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 80, false, "instance-1", nodeStatuses, componentRevisions)); + msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 9990, 8080, false, "instance-1", nodeStatuses, componentRevisions)); JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos); final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 1a8c59e..158468a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -620,8 +620,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } else { // there is a node with that ID and it's a different node resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(), proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(), - proposedIdentifier.getSocketAddress(), proposedIdentifier.getSocketPort(), - proposedIdentifier.getSiteToSiteAddress(), proposedIdentifier.getSiteToSitePort(), proposedIdentifier.isSiteToSiteSecure()); + proposedIdentifier.getSocketAddress(), proposedIdentifier.getSocketPort(), proposedIdentifier.getSiteToSiteAddress(), + proposedIdentifier.getSiteToSitePort(), proposedIdentifier.getSiteToSiteHttpApiPort(), proposedIdentifier.isSiteToSiteSecure()); logger.debug("A node already exists with ID {}. Proposed Node Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is {}", proposedIdentifier.getId(), proposedIdentifier, getNodeIdentifier(proposedIdentifier.getId()), resolvedNodeId); } @@ -682,7 +682,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl // TODO: Remove the 'null' values here from the ConnectionResponse all together. These // will no longer be needed for site-to-site once the NCM is gone. - return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, instanceId, + return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, null, null, instanceId, new ArrayList<>(nodeStatuses.values()), revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList())); } @@ -690,7 +690,8 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) { return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), - nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), nodeId.isSiteToSiteSecure(), dn); + nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), + nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index d8c4e19..7640787 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -51,7 +51,7 @@ public class TestAbstractHeartbeatMonitor { @Before public void setup() throws Exception { - nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, false); + nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, null, false); } @After http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java index f4d5d2e..f7890b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java @@ -36,14 +36,14 @@ public class TestProcessorEndpointMerger { final ProcessorEndpointMerger merger = new ProcessorEndpointMerger(); final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>(); - final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", "localhost", 9000, "localhost", 9001, "localhost", 9002, false); + final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", "localhost", 9000, "localhost", 9001, "localhost", 9002, 9003, false); final List<String> nodeValidationErrors1234 = new ArrayList<>(); nodeValidationErrors1234.add("error 1"); nodeValidationErrors1234.add("error 2"); merger.mergeValidationErrors(validationErrorMap, nodeId1234, nodeValidationErrors1234); - final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); + final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); final List<String> nodeValidationErrorsXyz = new ArrayList<>(); nodeValidationErrorsXyz.add("error 1"); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java index c97db03..cc71b9b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java @@ -40,10 +40,10 @@ public class TestResponseUtils { @Test public void testFindLongResponseTimes() throws URISyntaxException { final Map<NodeIdentifier, NodeResponse> responses = new HashMap<>(); - final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); - final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false); - final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, false); - final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, false); + final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); + final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false); + final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, 8303, false); + final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, 8403, false); final URI uri = new URI("localhost:8080"); final ClientResponse clientResponse = Mockito.mock(ClientResponse.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 0bce521..a9c0af9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -82,7 +82,7 @@ public class TestThreadPoolRequestReplicator { public void testResponseRemovedWhenCompletedAndFetched() { withReplicator(replicator -> { final Set<NodeIdentifier> nodeIds = new HashSet<>(); - nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false)); + nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false)); final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); @@ -109,7 +109,7 @@ public class TestThreadPoolRequestReplicator { public void testLongWaitForResponse() { withReplicator(replicator -> { final Set<NodeIdentifier> nodeIds = new HashSet<>(); - final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, false); + final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false); nodeIds.add(nodeId); final URI uri = new URI("http://localhost:8080/processors/1"); final Entity entity = new ProcessorEntity(); @@ -138,10 +138,10 @@ public class TestThreadPoolRequestReplicator { public void testCompleteOnError() { withReplicator(replicator -> { final Set<NodeIdentifier> nodeIds = new HashSet<>(); - final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false); - final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false); - final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, false); - final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, false); + final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false); + final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false); + final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, 8303, false); + final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, "localhost", 8401, "localhost", 8402, 8403, false); nodeIds.add(id1); nodeIds.add(id2); nodeIds.add(id3); @@ -159,7 +159,7 @@ public class TestThreadPoolRequestReplicator { @Test(timeout = 15000) public void testMultipleRequestWithTwoPhaseCommit() { final Set<NodeIdentifier> nodeIds = new HashSet<>(); - final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false); + final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false); nodeIds.add(nodeId); final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class); @@ -223,12 +223,12 @@ public class TestThreadPoolRequestReplicator { // build a map of connection state to node ids final Map<NodeConnectionState, List<NodeIdentifier>> nodeMap = new HashMap<>(); final List<NodeIdentifier> connectedNodes = new ArrayList<>(); - connectedNodes.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false)); - connectedNodes.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false)); + connectedNodes.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false)); + connectedNodes.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false)); nodeMap.put(NodeConnectionState.CONNECTED, connectedNodes); final List<NodeIdentifier> otherState = new ArrayList<>(); - otherState.add(new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, false)); + otherState.add(new NodeIdentifier("3", "localhost", 8300, "localhost", 8301, "localhost", 8302, 8303, false)); nodeMap.put(NodeConnectionState.CONNECTING, otherState); Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap); @@ -280,8 +280,8 @@ public class TestThreadPoolRequestReplicator { @Test(timeout = 15000) public void testOneNodeRejectsTwoPhaseCommit() { final Set<NodeIdentifier> nodeIds = new HashSet<>(); - nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, false)); - nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, false)); + nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 8101, "localhost", 8102, 8103, false)); + nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 8201, "localhost", 8202, 8203, false)); final ClusterCoordinator coordinator = createClusterCoordinator(); final AtomicInteger requestCount = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java index 994370c..de2d810 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java @@ -450,8 +450,8 @@ public class TestNodeClusterCoordinator { @Test public void testProposedIdentifierResolvedIfConflict() { - final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, false); - final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, false); + final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 8000, "localhost", 9000, "localhost", 10000, 11000, false); + final NodeIdentifier conflictingId = new NodeIdentifier("1234", "localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false); final ConnectionRequest connectionRequest = new ConnectionRequest(id1); final ConnectionRequestMessage crm = new ConnectionRequestMessage(); @@ -484,7 +484,7 @@ public class TestNodeClusterCoordinator { private NodeIdentifier createNodeId(final int index) { - return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, false); + return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, false); } private ProtocolMessage requestConnection(final NodeIdentifier requestedNodeId, final NodeClusterCoordinator coordinator) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 22ccb43..ff96fba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -21,6 +21,7 @@ import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import java.net.URI; import java.util.Date; @@ -158,6 +159,26 @@ public interface RemoteProcessGroup extends Authorizable, Positionable { */ EventReporter getEventReporter(); + SiteToSiteTransportProtocol getTransportProtocol(); + + void setTransportProtocol(SiteToSiteTransportProtocol transportProtocol); + + String getProxyHost(); + + void setProxyHost(String proxyHost); + + Integer getProxyPort(); + + void setProxyPort(Integer proxyPort); + + String getProxyUser(); + + void setProxyUser(String proxyUser); + + String getProxyPassword(); + + void setProxyPassword(String proxyPassword); + /** * Initiates a task in the remote process group to re-initialize, as a * result of clustering changes http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java index b60e789..43363c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java @@ -16,15 +16,14 @@ */ package org.apache.nifi.remote; -import java.util.Map; -import java.util.Set; - import org.apache.nifi.connectable.Port; import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.NotAuthorizedException; import org.apache.nifi.remote.exception.RequestExpiredException; import org.apache.nifi.remote.protocol.ServerProtocol; +import java.util.Set; + public interface RootGroupPort extends Port { boolean isTransmitting(); @@ -52,20 +51,18 @@ public interface RootGroupPort extends Port { * * @param peer peer * @param serverProtocol protocol - * @param requestHeaders headers * * @return the number of FlowFiles received * @throws org.apache.nifi.remote.exception.NotAuthorizedException nae * @throws org.apache.nifi.remote.exception.BadRequestException bre * @throws org.apache.nifi.remote.exception.RequestExpiredException ree */ - int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; + int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException; /** * Transfers data to the given stream * * @param peer peer - * @param requestHeaders headers * @param serverProtocol protocol * * @return the number of FlowFiles transferred @@ -73,6 +70,6 @@ public interface RootGroupPort extends Port { * @throws org.apache.nifi.remote.exception.BadRequestException bre * @throws org.apache.nifi.remote.exception.RequestExpiredException ree */ - int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; + int transferFlowFiles(Peer peer, ServerProtocol serverProtocol) throws NotAuthorizedException, BadRequestException, RequestExpiredException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f638820..54beeb0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -185,6 +185,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.HttpRemoteSiteListener; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RemoteResourceManager; import org.apache.nifi.remote.RemoteSiteListener; @@ -194,6 +195,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRootGroupPort; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; @@ -268,7 +270,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final ExtensionManager extensionManager; private final NiFiProperties properties; private final SSLContext sslContext; - private final RemoteSiteListener externalSiteListener; + private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>(); private final AtomicReference<CounterRepository> counterRepositoryRef; private final AtomicBoolean initialized = new AtomicBoolean(false); private final StandardControllerServiceProvider controllerServiceProvider; @@ -289,8 +291,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false); private final Integer remoteInputSocketPort; + private final Integer remoteInputHttpPort; private final Boolean isSiteToSiteSecure; private Integer clusterManagerRemoteSitePort = null; + private Integer clusterManagerRemoteSiteHttpPort = null; private Boolean clusterManagerRemoteSiteCommsSecure = null; private ProcessGroup rootGroup; @@ -398,7 +402,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R bulletinRepo, heartbeatMonitor); - flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); + flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); return flowController; } @@ -484,6 +488,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R gracefulShutdownSeconds = shutdownSecs; remoteInputSocketPort = properties.getRemoteInputPort(); + remoteInputHttpPort = properties.getRemoteInputHttpPort(); isSiteToSiteSecure = properties.isSiteToSiteSecure(); if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) { @@ -503,17 +508,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R controllerServiceProvider.setRootProcessGroup(rootGroup); if (remoteInputSocketPort == null) { - LOG.info("Not enabling Site-to-Site functionality because nifi.remote.input.socket.port is not set"); - externalSiteListener = null; + LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set"); } else if (isSiteToSiteSecure && sslContext == null) { LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore " + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed."); - externalSiteListener = null; } else { // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class); - externalSiteListener = new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null); - externalSiteListener.setRootGroup(rootGroup); + externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null)); + } + + if (remoteInputHttpPort == null) { + LOG.info("Not enabling HTTP(S) Site-to-Site functionality because nifi.remote.input.html.enabled is not true"); + } else { + externalSiteListeners.add(HttpRemoteSiteListener.getInstance()); + } + + for(RemoteSiteListener listener : externalSiteListeners) { + listener.setRootGroup(rootGroup); } // Determine frequency for obtaining component status snapshots @@ -650,8 +662,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // ContentRepository to purge superfluous files contentRepository.cleanup(); - if (externalSiteListener != null) { - externalSiteListener.start(); + for(RemoteSiteListener listener : externalSiteListeners) { + listener.start(); } notifyComponentsConfigurationRestored(); @@ -1288,8 +1300,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R + "will take an indeterminate amount of time to stop. Might need to kill the program manually."); } - if (externalSiteListener != null) { - externalSiteListener.stop(); + for(RemoteSiteListener listener : externalSiteListeners) { + listener.stop(); } if (processScheduler != null) { @@ -1433,8 +1445,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R try { rootGroup = group; - if (externalSiteListener != null) { - externalSiteListener.setRootGroup(group); + for(RemoteSiteListener listener : externalSiteListeners) { + listener.setRootGroup(rootGroup); } controllerServiceProvider.setRootProcessGroup(rootGroup); @@ -1661,6 +1673,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration()); + remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol())); + remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost()); + remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort()); + remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser()); + remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword()); remoteGroup.setProcessGroup(group); // set the input/output ports @@ -3778,10 +3795,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return new ArrayList<>(history.getActions()); } - public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Boolean commsSecure) { + public void setClusterManagerRemoteSiteInfo(final Integer managerListeningPort, final Integer managerListeningHttpPort, final Boolean commsSecure) { writeLock.lock(); try { clusterManagerRemoteSitePort = managerListeningPort; + clusterManagerRemoteSiteHttpPort = managerListeningHttpPort; clusterManagerRemoteSiteCommsSecure = commsSecure; } finally { writeLock.unlock(); @@ -3797,6 +3815,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + + public Integer getClusterManagerRemoteSiteListeningHttpPort() { + readLock.lock(); + try { + return clusterManagerRemoteSiteHttpPort; + } finally { + readLock.unlock(); + } + } + public Boolean isClusterManagerRemoteSiteCommsSecure() { readLock.lock(); try { @@ -3810,6 +3838,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return remoteInputSocketPort; } + public Integer getRemoteSiteListeningHttpPort() { + return remoteInputHttpPort; + } + public Boolean isRemoteSiteCommsSecure() { return isSiteToSiteSecure; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 2f16f7f..922bff0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -205,7 +205,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { this.nodeId = new NodeIdentifier(nodeUuid, nodeApiAddress.getHostName(), nodeApiAddress.getPort(), nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), - properties.getRemoteInputHost(), properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); + properties.getRemoteInputHost(), properties.getRemoteInputPort(), + properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure()); } else { this.configuredForClustering = false; @@ -471,7 +472,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setClustered(true, null); clusterCoordinator.setConnected(false); - controller.setClusterManagerRemoteSiteInfo(null, null); + controller.setClusterManagerRemoteSiteInfo(null, null, null); controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)); /* @@ -585,8 +586,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // reconnect final ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(), - request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(), - request.getNodeConnectionStatuses(), request.getComponentRevisions()); + request.getManagerRemoteSiteListeningPort(), request.getManagerRemoteSiteListeningHttpPort(), + request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(), + request.getNodeConnectionStatuses(), request.getComponentRevisions()); connectionResponse.setCoordinatorDN(request.getRequestorDN()); loadFromConnectionResponse(connectionResponse); @@ -848,7 +850,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // mark the node as clustered controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN()); - controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.isManagerRemoteCommsSecure()); + controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure()); controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED)); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 93b9761..6b938d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -53,6 +53,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; @@ -923,7 +924,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // add remote process group final List<Element> remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup"); for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) { - final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement); + final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor); final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUri()); remoteGroup.setComments(remoteGroupDto.getComments()); remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition())); @@ -938,6 +939,27 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration()); } + String transportProtocol = remoteGroupDto.getTransportProtocol(); + if (transportProtocol != null && !transportProtocol.trim().isEmpty()) { + remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase())); + } + + if (remoteGroupDto.getProxyHost() != null) { + remoteGroup.setProxyHost(remoteGroupDto.getProxyHost()); + } + + if (remoteGroupDto.getProxyPort() != null) { + remoteGroup.setProxyPort(remoteGroupDto.getProxyPort()); + } + + if (remoteGroupDto.getProxyUser() != null) { + remoteGroup.setProxyUser(remoteGroupDto.getProxyUser()); + } + + if (remoteGroupDto.getProxyPassword() != null) { + remoteGroup.setProxyPassword(remoteGroupDto.getProxyPassword()); + } + final Set<RemoteProcessGroupPortDescriptor> inputPorts = new HashSet<>(); for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java index bcf692d..b7a55ad 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java @@ -247,6 +247,7 @@ public class TemplateUtils { remoteProcessGroupDTO.setInputPortCount(null); remoteProcessGroupDTO.setOutputPortCount(null); remoteProcessGroupDTO.setTransmitting(null); + remoteProcessGroupDTO.setProxyPassword(null); // if this remote process group has contents if (remoteProcessGroupDTO.getContents() != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index f6de870..ef910ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -175,7 +175,7 @@ public class FlowFromDOMFactory { nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup"); for (int i = 0; i < nodeList.getLength(); i++) { - remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i))); + remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i), encryptor)); } nodeList = DomUtils.getChildNodesByTagName(element, "connection"); @@ -246,7 +246,7 @@ public class FlowFromDOMFactory { return dto; } - public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) { + public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element, final StringEncryptor encryptor) { final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); dto.setId(getString(element, "id")); dto.setName(getString(element, "name")); @@ -255,6 +255,13 @@ public class FlowFromDOMFactory { dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setCommunicationsTimeout(getString(element, "timeout")); dto.setComments(getString(element, "comment")); + dto.setYieldDuration(getString(element, "yieldPeriod")); + dto.setTransportProtocol(getString(element, "transportProtocol")); + dto.setProxyHost(getString(element, "proxyHost")); + dto.setProxyPort(getOptionalInt(element, "proxyPort")); + dto.setProxyUser(getString(element, "proxyUser")); + String proxyPassword = decrypt(getString(element, "proxyPassword"), encryptor); + dto.setProxyPassword(proxyPassword); return dto; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index e26f550..e774637 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -56,6 +56,7 @@ import org.apache.nifi.persistence.TemplateSerializer; import org.apache.nifi.processor.Relationship; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.util.StringUtils; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -244,6 +245,16 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout()); addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration()); addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting())); + addTextElement(element, "transportProtocol", remoteRef.getTransportProtocol().name()); + addTextElement(element, "proxyHost", remoteRef.getProxyHost()); + if (remoteRef.getProxyPort() != null) { + addTextElement(element, "proxyPort", remoteRef.getProxyPort()); + } + addTextElement(element, "proxyUser", remoteRef.getProxyUser()); + if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) { + String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; + addTextElement(element, "proxyPassword", value); + } for (final RemoteGroupPort port : remoteRef.getInputPorts()) { if (port.hasIncomingConnection()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index cdabeca..fb9da32 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -17,7 +17,6 @@ package org.apache.nifi.remote; import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.UniformInterfaceException; import org.apache.nifi.authorization.Resource; @@ -39,6 +38,9 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; +import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.reporting.Severity; @@ -46,13 +48,12 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.entity.ControllerEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; -import javax.ws.rs.core.Response; import java.io.File; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -82,13 +83,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private static final Logger logger = LoggerFactory.getLogger(StandardRemoteProcessGroup.class); - public static final String SITE_TO_SITE_URI_PATH = "/site-to-site"; - public static final String ROOT_GROUP_STATUS_URI_PATH = "/flow/process-groups/root/status"; - // status codes - public static final int OK_STATUS_CODE = Status.OK.getStatusCode(); - public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode(); - public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode(); + private static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode(); + private static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode(); private final String id; @@ -112,6 +109,12 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private volatile String communicationsTimeout = "30 sec"; private volatile String targetId; private volatile String yieldDuration = "10 sec"; + private volatile SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW; + private volatile String proxyHost; + private volatile Integer proxyPort; + private volatile String proxyUser; + private volatile String proxyPassword; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -127,6 +130,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private Long refreshContentsTimestamp = null; private Boolean destinationSecure; private Integer listeningPort; + private Integer listeningHttpPort; private volatile String authorizationIssue; @@ -235,6 +239,56 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { this.targetId = targetId; } + @Override + public void setTransportProtocol(final SiteToSiteTransportProtocol transportProtocol) { + this.transportProtocol = transportProtocol; + } + + @Override + public SiteToSiteTransportProtocol getTransportProtocol() { + return transportProtocol; + } + + @Override + public String getProxyHost() { + return proxyHost; + } + + @Override + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + @Override + public Integer getProxyPort() { + return proxyPort; + } + + @Override + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } + + @Override + public String getProxyUser() { + return proxyUser; + } + + @Override + public void setProxyUser(String proxyUser) { + this.proxyUser = proxyUser; + } + + @Override + public String getProxyPassword() { + return proxyPassword; + } + + @Override + public void setProxyPassword(String proxyPassword) { + this.proxyPassword = proxyPassword; + } + /** * @return the ID of the Root Group on the remote instance */ @@ -696,10 +750,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return parent == null ? context : getRootGroup(parent); } - private boolean isWebApiSecure() { - return targetUri.toString().toLowerCase().startsWith("https"); - } - private void refreshFlowContentsFromLocal() { final ProcessGroup rootGroup = getRootGroup(); setName(rootGroup.getName()); @@ -725,6 +775,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final NiFiProperties props = NiFiProperties.getInstance(); this.destinationSecure = props.isSiteToSiteSecure(); this.listeningPort = props.getRemoteInputPort(); + this.listeningHttpPort = props.getRemoteInputHttpPort(); refreshContentsTimestamp = System.currentTimeMillis(); } finally { @@ -760,20 +811,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return; } - final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null); - final String uriVal = apiUri.toString() + SITE_TO_SITE_URI_PATH; - URI uri; - try { - uri = new URI(uriVal); - } catch (final URISyntaxException e) { - throw new CommunicationsException("Invalid URI: " + uriVal); - } - try { // perform the request - final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - - if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) { + final ControllerDTO dto; + try ( + final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient(); + ){ + dto = apiClient.getController(); + } catch (IOException e) { writeLock.lock(); try { for (final Iterator<StandardRemoteGroupPort> iter = inputPorts.values().iterator(); iter.hasNext();) { @@ -793,15 +838,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { writeLock.unlock(); } - // consume the entity entirely - response.getEntity(String.class); - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " - + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase()); + throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + apiUri + " due to: " + e.getMessage()); } - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - final ControllerDTO dto = entity.getController(); - writeLock.lock(); try { if (dto.getInputPorts() != null) { @@ -853,6 +892,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } this.listeningPort = dto.getRemoteSiteListeningPort(); + this.listeningHttpPort = dto.getRemoteSiteHttpListeningPort(); this.destinationSecure = dto.isSiteToSiteSecure(); final ProcessGroupCounts newCounts = new ProcessGroupCounts(inputPortCount, outputPortCount, @@ -867,6 +907,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } + private SiteToSiteRestApiClient getSiteToSiteRestApiClient() { + SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword)); + apiClient.setBaseUrl(apiUri.toString()); + apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + return apiClient; + } + /** * Converts a set of ports into a set of remote process group ports. * @@ -1075,7 +1123,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public boolean isSiteToSiteEnabled() { readLock.lock(); try { - return this.listeningPort != null; + return (this.listeningPort != null || this.listeningHttpPort != null); } finally { readLock.unlock(); } @@ -1090,18 +1138,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void run() { - try { - final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null); - final ClientResponse response = utils.get(new URI(apiUri + SITE_TO_SITE_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - - final int statusCode = response.getStatus(); - - if (statusCode == OK_STATUS_CODE) { - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - final ControllerDTO dto = entity.getController(); + try ( + final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient(); + ){ + try { + final ControllerDTO dto = apiClient.getController(); - if (dto.getRemoteSiteListeningPort() == null) { - authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time."; + if (dto.getRemoteSiteListeningPort() == null && SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) { + authorizationIssue = "Remote instance is not configured to allow RAW Site-to-Site communications at this time."; + } else if (dto.getRemoteSiteHttpListeningPort() == null && SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) { + authorizationIssue = "Remote instance is not configured to allow HTTP Site-to-Site communications at this time."; } else { authorizationIssue = null; } @@ -1109,6 +1155,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { writeLock.lock(); try { listeningPort = dto.getRemoteSiteListeningPort(); + listeningHttpPort = dto.getRemoteSiteHttpListeningPort(); destinationSecure = dto.isSiteToSiteSecure(); } finally { writeLock.unlock(); @@ -1117,31 +1164,39 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final String remoteInstanceId = dto.getInstanceId(); final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); pointsToCluster.set(isPointingToCluster); - } else if (statusCode == UNAUTHORIZED_STATUS_CODE) { - try { - final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); - if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) { - logger.info("{} Issued a Request to communicate with remote instance", this); - } else { - logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ - this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()}); - } - } catch (final Exception e) { - logger.error("{} Failed to request account due to {}", this, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); + + } catch (SiteToSiteRestApiClient.HttpGetFailedException e) { + + if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) { + // TODO: implement registration request + /* + try { + final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); + if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) { + logger.info("{} Issued a Request to communicate with remote instance", this); + } else { + logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ + this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()}); + } + } catch (final Exception e) { + logger.error("{} Failed to request account due to {}", this, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } } - } + */ + authorizationIssue = e.getDescription(); - authorizationIssue = response.getEntity(String.class); - } else if (statusCode == FORBIDDEN_STATUS_CODE) { - authorizationIssue = response.getEntity(String.class); - } else { - final String message = response.getEntity(String.class); - logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", - new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message}); - authorizationIssue = "Unable to determine Site-to-Site availability."; + } else if (e.getResponseCode() == FORBIDDEN_STATUS_CODE) { + authorizationIssue = e.getDescription(); + } else { + final String message = e.getDescription(); + logger.warn("{} When communicating with remote instance, got unexpected result. {}", + new Object[]{this, e.getMessage()}); + authorizationIssue = "Unable to determine Site-to-Site availability."; + } } + } catch (final Exception e) { logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s", http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 0017630..dfbc6fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -111,9 +111,11 @@ nifi.components.status.repository.buffer.size=${nifi.components.status.repositor nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency} # Site to Site properties -nifi.remote.input.socket.host= +nifi.remote.input.host= +nifi.remote.input.secure=false nifi.remote.input.socket.port= -nifi.remote.input.secure=true +nifi.remote.input.http.enabled=true +nifi.remote.input.http.transaction.ttl=30 sec # web properties # nifi.web.war.directory=${nifi.web.war.directory} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java new file mode 100644 index 0000000..acf7fc5 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.remote.protocol.FlowFileTransaction; +import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.util.NiFiProperties.DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL; +import static org.apache.nifi.util.NiFiProperties.SITE_TO_SITE_HTTP_TRANSACTION_TTL; + +public class HttpRemoteSiteListener implements RemoteSiteListener { + + private static final Logger logger = LoggerFactory.getLogger(HttpRemoteSiteListener.class); + private final int transactionTtlSec; + private static HttpRemoteSiteListener instance; + + private final Map<String, TransactionWrapper> transactions = new ConcurrentHashMap<>(); + private final ScheduledExecutorService taskExecutor; + private final int httpListenPort; + private ProcessGroup rootGroup; + private ScheduledFuture<?> transactionMaintenanceTask; + + private HttpRemoteSiteListener() { + super(); + taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("Http Site-to-Site Transaction Maintenance"); + thread.setDaemon(true); + return thread; + } + }); + + NiFiProperties properties = NiFiProperties.getInstance(); + int txTtlSec; + try { + final String snapshotFrequency = properties.getProperty(SITE_TO_SITE_HTTP_TRANSACTION_TTL, DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL); + txTtlSec = (int) FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.SECONDS); + } catch (final Exception e) { + txTtlSec = (int) FormatUtils.getTimeDuration(DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL, TimeUnit.SECONDS); + logger.warn("Failed to parse {} due to {}, use default as {} secs.", + SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), txTtlSec); + } + transactionTtlSec = txTtlSec; + + httpListenPort = properties.getRemoteInputHttpPort() != null ? properties.getRemoteInputHttpPort() : 0; + + } + + public static HttpRemoteSiteListener getInstance() { + if (instance == null) { + synchronized (HttpRemoteSiteListener.class) { + if (instance == null) { + instance = new HttpRemoteSiteListener(); + } + } + } + return instance; + } + + private class TransactionWrapper { + private final FlowFileTransaction transaction; + private long lastCommunicationAt; + + private TransactionWrapper(final FlowFileTransaction transaction) { + this.transaction = transaction; + this.lastCommunicationAt = System.currentTimeMillis(); + } + + private boolean isExpired() { + long elapsedMillis = System.currentTimeMillis() - lastCommunicationAt; + long elapsedSec = TimeUnit.SECONDS.convert(elapsedMillis, TimeUnit.MILLISECONDS); + return elapsedSec > transactionTtlSec; + } + + private void extend() { + lastCommunicationAt = System.currentTimeMillis(); + } + } + + @Override + public void setRootGroup(ProcessGroup rootGroup) { + this.rootGroup = rootGroup; + } + + public void setupServerProtocol(HttpFlowFileServerProtocol serverProtocol) { + serverProtocol.setRootProcessGroup(rootGroup); + } + + @Override + public void start() throws IOException { + transactionMaintenanceTask = taskExecutor.scheduleWithFixedDelay(() -> { + + int originalSize = transactions.size(); + logger.trace("Transaction maintenance task started."); + try { + Set<String> transactionIds = transactions.keySet().stream().collect(Collectors.toSet()); + transactionIds.stream().filter(tid -> !isTransactionActive(tid)) + .forEach(tid -> { + cancelTransaction(tid); + }); + } catch (Exception e) { + // Swallow exception so that this thread can keep working. + logger.error("An exception occurred while maintaining transactions", e); + } + logger.debug("Transaction maintenance task finished. originalSize={}, currentSize={}", originalSize, transactions.size()); + + }, 0, transactionTtlSec / 2, TimeUnit.SECONDS); + } + + public void cancelTransaction(String transactionId) { + TransactionWrapper wrapper = transactions.remove(transactionId); + if (wrapper == null) { + logger.debug("The transaction was not found. transactionId={}", transactionId); + } else { + logger.debug("Cancel a transaction. transactionId={}", transactionId); + FlowFileTransaction t = wrapper.transaction; + if(t != null && t.getSession() != null){ + logger.info("Cancel a transaction, rollback its session. transactionId={}", transactionId); + try { + t.getSession().rollback(); + } catch (Exception e) { + // Swallow exception so that it can keep expiring other transactions. + logger.error("Failed to rollback. transactionId={}", transactionId, e); + } + } + } + } + + @Override + public int getPort() { + return httpListenPort; + } + + @Override + public void stop() { + if(transactionMaintenanceTask != null) { + logger.debug("Stopping transactionMaintenanceTask..."); + transactionMaintenanceTask.cancel(true); + } + } + + public String createTransaction() { + final String transactionId = UUID.randomUUID().toString(); + transactions.put(transactionId, new TransactionWrapper(null)); + logger.debug("Created a new transaction: {}", transactionId); + return transactionId; + } + + public boolean isTransactionActive(final String transactionId) { + TransactionWrapper transaction = transactions.get(transactionId); + if (transaction == null) { + return false; + } + if (transaction.isExpired()) { + return false; + } + return true; + } + + public void holdTransaction(final String transactionId, final FlowFileTransaction transaction) throws IllegalStateException { + // We don't check expiration of the transaction here, to support large file transport or slow network. + // The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource. + TransactionWrapper currentTransaction = transactions.remove(transactionId); + if (currentTransaction == null) { + logger.debug("The transaction was not found, it looks it took longer than transaction TTL."); + } else if (currentTransaction.transaction != null) { + throw new IllegalStateException("Transaction has already been processed. It can only be finalized. transactionId=" + transactionId); + } + if (transaction.getSession() == null) { + throw new IllegalStateException("Passed transaction is not associated any session yet, can not hold. transactionId=" + transactionId); + } + logger.debug("Holding a transaction: {}", transactionId); + // Server has received or sent all data, and transaction TTL count down starts here. + // However, if the client doesn't consume data fast enough, server might expire and rollback the transaction. + transactions.put(transactionId, new TransactionWrapper(transaction)); + } + + public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException { + if (!isTransactionActive(transactionId)){ + throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId); + } + TransactionWrapper transaction = transactions.remove(transactionId); + if (transaction == null) { + throw new IllegalStateException("Transaction was not found anymore. It's already finalized or expired. transactionId=" + transactionId); + } + if (transaction.transaction == null) { + throw new IllegalStateException("Transaction has not started yet."); + } + logger.debug("Finalized a transaction: {}", transactionId); + return transaction.transaction; + } + + public void extendsTransaction(final String transactionId) throws IllegalStateException { + if (!isTransactionActive(transactionId)){ + throw new IllegalStateException("Transaction was not found or not active anymore. transactionId=" + transactionId); + } + TransactionWrapper transaction = transactions.get(transactionId); + if (transaction != null) { + logger.debug("Extending transaction TTL, transactionId={}", transactionId); + transaction.extend(); + } + } + + public int getTransactionTtlSec() { + return transactionTtlSec; + } + +}
