http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
index a4eb46e..9a53a72 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -34,6 +34,7 @@ public class AdaptedConnectionResponse {
     private String rejectionReason;
     private int tryLaterSeconds;
     private Integer managerRemoteInputPort;
+    private Integer managerRemoteInputHttpPort;
     private Boolean managerRemoteCommsSecure;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
@@ -88,6 +89,14 @@ public class AdaptedConnectionResponse {
         return managerRemoteInputPort;
     }
 
+    public void setManagerRemoteInputHttpPort(Integer 
managerRemoteInputHttpPort) {
+        this.managerRemoteInputHttpPort = managerRemoteInputHttpPort;
+    }
+
+    public Integer getManagerRemoteInputHttpPort() {
+        return managerRemoteInputHttpPort;
+    }
+
     public void setManagerRemoteCommsSecure(Boolean secure) {
         this.managerRemoteCommsSecure = secure;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
index beca014..a2d9968 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
@@ -27,6 +27,8 @@ public class AdaptedNodeIdentifier {
     private int socketPort;
     private String siteToSiteAddress;
     private Integer siteToSitePort;
+    private Integer siteToSiteHttpApiPort;
+
     private boolean siteToSiteSecure;
 
     public AdaptedNodeIdentifier() {
@@ -96,4 +98,13 @@ public class AdaptedNodeIdentifier {
     public void setSiteToSiteSecure(boolean siteToSiteSecure) {
         this.siteToSiteSecure = siteToSiteSecure;
     }
+
+    public Integer getSiteToSiteHttpApiPort() {
+        return siteToSiteHttpApiPort;
+    }
+
+    public void setSiteToSiteHttpApiPort(Integer siteToSiteHttpApiPort) {
+        this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index ca98a86..cf64e71 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -32,6 +32,7 @@ public class ConnectionResponseAdapter extends 
XmlAdapter<AdaptedConnectionRespo
             aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
             aCr.setRejectionReason(cr.getRejectionReason());
             aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
+            
aCr.setManagerRemoteInputHttpPort(cr.getManagerRemoteInputHttpPort());
             aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
             aCr.setInstanceId(cr.getInstanceId());
             aCr.setNodeConnectionStatuses(cr.getNodeConnectionStatuses());
@@ -48,7 +49,7 @@ public class ConnectionResponseAdapter extends 
XmlAdapter<AdaptedConnectionRespo
             return 
ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {
             return new ConnectionResponse(aCr.getNodeIdentifier(), 
aCr.getDataFlow(),
-                aCr.getManagerRemoteInputPort(), 
aCr.isManagerRemoteCommsSecure(),
+                aCr.getManagerRemoteInputPort(), 
aCr.getManagerRemoteInputHttpPort(), aCr.isManagerRemoteCommsSecure(),
                 aCr.getInstanceId(), aCr.getNodeConnectionStatuses(), 
aCr.getComponentRevisions());
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
index b040cd4..4a2660f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
@@ -36,6 +36,7 @@ public class NodeIdentifierAdapter extends 
XmlAdapter<AdaptedNodeIdentifier, Nod
             aNi.setSocketPort(ni.getSocketPort());
             aNi.setSiteToSiteAddress(ni.getSiteToSiteAddress());
             aNi.setSiteToSitePort(ni.getSiteToSitePort());
+            aNi.setSiteToSiteHttpApiPort(ni.getSiteToSiteHttpApiPort());
             aNi.setSiteToSiteSecure(ni.isSiteToSiteSecure());
             return aNi;
         }
@@ -47,7 +48,7 @@ public class NodeIdentifierAdapter extends 
XmlAdapter<AdaptedNodeIdentifier, Nod
             return null;
         } else {
             return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), 
aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort(),
-                aNi.getSiteToSiteAddress(), aNi.getSiteToSitePort(), 
aNi.isSiteToSiteSecure());
+                aNi.getSiteToSiteAddress(), 
aNi.getSiteToSitePort(),aNi.getSiteToSiteHttpApiPort(), 
aNi.isSiteToSiteSecure());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
index 4208498..008b586 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java
@@ -36,6 +36,7 @@ public class ReconnectionRequestMessage extends 
ProtocolMessage {
     private StandardDataFlow dataFlow;
     private boolean primary;
     private Integer managerRemoteSiteListeningPort;
+    private Integer managerRemoteSiteListeningHttpPort;
     private Boolean managerRemoteSiteCommsSecure;
     private String instanceId;
     private List<NodeConnectionStatus> nodeStatuses;
@@ -82,6 +83,14 @@ public class ReconnectionRequestMessage extends 
ProtocolMessage {
         return managerRemoteSiteListeningPort;
     }
 
+    public void setManagerRemoteSiteListeningHttpPort(Integer 
managerRemoteSiteListeningHttpPort) {
+        this.managerRemoteSiteListeningHttpPort = 
managerRemoteSiteListeningHttpPort;
+    }
+
+    public Integer getManagerRemoteSiteListeningHttpPort() {
+        return managerRemoteSiteListeningHttpPort;
+    }
+
     public void setManagerRemoteSiteCommsSecure(final Boolean 
remoteSiteCommsSecure) {
         this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 25ab73a..8edabd2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -45,11 +45,11 @@ public class TestJaxbProtocolUtils {
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         final ConnectionResponseMessage msg = new ConnectionResponseMessage();
-        final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 
8000, "localhost", 8001, "localhost", 8002, true);
+        final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, true);
         final DataFlow dataFlow = new StandardDataFlow(new byte[0], new 
byte[0]);
         final List<NodeConnectionStatus> nodeStatuses = 
Collections.singletonList(new NodeConnectionStatus(nodeId, 
DisconnectionCode.NOT_YET_CONNECTED));
         final List<ComponentRevision> componentRevisions = 
Collections.singletonList(ComponentRevision.fromRevision(new Revision(8L, 
"client-1", "component-1")));
-        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 80, 
false, "instance-1", nodeStatuses, componentRevisions));
+        msg.setConnectionResponse(new ConnectionResponse(nodeId, dataFlow, 
9990, 8080, false, "instance-1", nodeStatuses, componentRevisions));
 
         JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
         final Object unmarshalled = 
JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new 
ByteArrayInputStream(baos.toByteArray()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 1a8c59e..158468a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -620,8 +620,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         } else {
             // there is a node with that ID and it's a different node
             resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(), 
proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(),
-                proposedIdentifier.getSocketAddress(), 
proposedIdentifier.getSocketPort(),
-                proposedIdentifier.getSiteToSiteAddress(), 
proposedIdentifier.getSiteToSitePort(), 
proposedIdentifier.isSiteToSiteSecure());
+                proposedIdentifier.getSocketAddress(), 
proposedIdentifier.getSocketPort(), proposedIdentifier.getSiteToSiteAddress(),
+                proposedIdentifier.getSiteToSitePort(), 
proposedIdentifier.getSiteToSiteHttpApiPort(), 
proposedIdentifier.isSiteToSiteSecure());
             logger.debug("A node already exists with ID {}. Proposed Node 
Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is 
{}",
                 proposedIdentifier.getId(), proposedIdentifier, 
getNodeIdentifier(proposedIdentifier.getId()), resolvedNodeId);
         }
@@ -682,7 +682,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
         // TODO: Remove the 'null' values here from the ConnectionResponse all 
together. These
         // will no longer be needed for site-to-site once the NCM is gone.
-        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, 
null, instanceId,
+        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, null, 
null, null, instanceId,
             new ArrayList<>(nodeStatuses.values()),
             revisionManager.getAllRevisions().stream().map(rev -> 
ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
     }
@@ -690,7 +690,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final 
String dn) {
         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort(),
             nodeId.getSocketAddress(), nodeId.getSocketPort(),
-            nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), 
nodeId.isSiteToSiteSecure(), dn);
+            nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(),
+            nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), 
dn);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index d8c4e19..7640787 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -51,7 +51,7 @@ public class TestAbstractHeartbeatMonitor {
 
     @Before
     public void setup() throws Exception {
-        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
9999, "localhost", 8888, "localhost", null, false);
+        nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
9999, "localhost", 8888, "localhost", null, null, false);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
index f4d5d2e..f7890b8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/TestProcessorEndpointMerger.java
@@ -36,14 +36,14 @@ public class TestProcessorEndpointMerger {
         final ProcessorEndpointMerger merger = new ProcessorEndpointMerger();
         final Map<String, Set<NodeIdentifier>> validationErrorMap = new 
HashMap<>();
 
-        final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", 
"localhost", 9000, "localhost", 9001, "localhost", 9002, false);
+        final NodeIdentifier nodeId1234 = new NodeIdentifier("1234", 
"localhost", 9000, "localhost", 9001, "localhost", 9002, 9003, false);
         final List<String> nodeValidationErrors1234 = new ArrayList<>();
         nodeValidationErrors1234.add("error 1");
         nodeValidationErrors1234.add("error 2");
 
         merger.mergeValidationErrors(validationErrorMap, nodeId1234, 
nodeValidationErrors1234);
 
-        final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 
8000, "localhost", 8001, "localhost", 8002, false);
+        final NodeIdentifier nodeXyz = new NodeIdentifier("xyz", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, false);
         final List<String> nodeValidationErrorsXyz = new ArrayList<>();
         nodeValidationErrorsXyz.add("error 1");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
index c97db03..cc71b9b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestResponseUtils.java
@@ -40,10 +40,10 @@ public class TestResponseUtils {
     @Test
     public void testFindLongResponseTimes() throws URISyntaxException {
         final Map<NodeIdentifier, NodeResponse> responses = new HashMap<>();
-        final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, false);
-        final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, 
"localhost", 8201, "localhost", 8202, false);
-        final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, 
"localhost", 8301, "localhost", 8302, false);
-        final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, 
"localhost", 8401, "localhost", 8402, false);
+        final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, 8003, false);
+        final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 8200, 
"localhost", 8201, "localhost", 8202, 8203, false);
+        final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 8300, 
"localhost", 8301, "localhost", 8302, 8303, false);
+        final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 8400, 
"localhost", 8401, "localhost", 8402, 8403, false);
 
         final URI uri = new URI("localhost:8080");
         final ClientResponse clientResponse = 
Mockito.mock(ClientResponse.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 0bce521..a9c0af9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -82,7 +82,7 @@ public class TestThreadPoolRequestReplicator {
     public void testResponseRemovedWhenCompletedAndFetched() {
         withReplicator(replicator -> {
             final Set<NodeIdentifier> nodeIds = new HashSet<>();
-            nodeIds.add(new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, false));
+            nodeIds.add(new NodeIdentifier("1", "localhost", 8000, 
"localhost", 8001, "localhost", 8002, 8003, false));
             final URI uri = new URI("http://localhost:8080/processors/1";);
             final Entity entity = new ProcessorEntity();
 
@@ -109,7 +109,7 @@ public class TestThreadPoolRequestReplicator {
     public void testLongWaitForResponse() {
         withReplicator(replicator -> {
             final Set<NodeIdentifier> nodeIds = new HashSet<>();
-            final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
8000, "localhost", 8001, "localhost", 8002, false);
+            final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, false);
             nodeIds.add(nodeId);
             final URI uri = new URI("http://localhost:8080/processors/1";);
             final Entity entity = new ProcessorEntity();
@@ -138,10 +138,10 @@ public class TestThreadPoolRequestReplicator {
     public void testCompleteOnError() {
         withReplicator(replicator -> {
             final Set<NodeIdentifier> nodeIds = new HashSet<>();
-            final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 
8100, "localhost", 8101, "localhost", 8102, false);
-            final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 
8200, "localhost", 8201, "localhost", 8202, false);
-            final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 
8300, "localhost", 8301, "localhost", 8302, false);
-            final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 
8400, "localhost", 8401, "localhost", 8402, false);
+            final NodeIdentifier id1 = new NodeIdentifier("1", "localhost", 
8100, "localhost", 8101, "localhost", 8102, 8103, false);
+            final NodeIdentifier id2 = new NodeIdentifier("2", "localhost", 
8200, "localhost", 8201, "localhost", 8202, 8203, false);
+            final NodeIdentifier id3 = new NodeIdentifier("3", "localhost", 
8300, "localhost", 8301, "localhost", 8302, 8303, false);
+            final NodeIdentifier id4 = new NodeIdentifier("4", "localhost", 
8400, "localhost", 8401, "localhost", 8402, 8403, false);
             nodeIds.add(id1);
             nodeIds.add(id2);
             nodeIds.add(id3);
@@ -159,7 +159,7 @@ public class TestThreadPoolRequestReplicator {
     @Test(timeout = 15000)
     public void testMultipleRequestWithTwoPhaseCommit() {
         final Set<NodeIdentifier> nodeIds = new HashSet<>();
-        final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
8100, "localhost", 8101, "localhost", 8102, false);
+        final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
8100, "localhost", 8101, "localhost", 8102, 8103, false);
         nodeIds.add(nodeId);
 
         final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
@@ -223,12 +223,12 @@ public class TestThreadPoolRequestReplicator {
         // build a map of connection state to node ids
         final Map<NodeConnectionState, List<NodeIdentifier>> nodeMap = new 
HashMap<>();
         final List<NodeIdentifier> connectedNodes = new ArrayList<>();
-        connectedNodes.add(new NodeIdentifier("1", "localhost", 8100, 
"localhost", 8101, "localhost", 8102, false));
-        connectedNodes.add(new NodeIdentifier("2", "localhost", 8200, 
"localhost", 8201, "localhost", 8202, false));
+        connectedNodes.add(new NodeIdentifier("1", "localhost", 8100, 
"localhost", 8101, "localhost", 8102, 8103, false));
+        connectedNodes.add(new NodeIdentifier("2", "localhost", 8200, 
"localhost", 8201, "localhost", 8202, 8203, false));
         nodeMap.put(NodeConnectionState.CONNECTED, connectedNodes);
 
         final List<NodeIdentifier> otherState = new ArrayList<>();
-        otherState.add(new NodeIdentifier("3", "localhost", 8300, "localhost", 
8301, "localhost", 8302, false));
+        otherState.add(new NodeIdentifier("3", "localhost", 8300, "localhost", 
8301, "localhost", 8302, 8303, false));
         nodeMap.put(NodeConnectionState.CONNECTING, otherState);
 
         Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
@@ -280,8 +280,8 @@ public class TestThreadPoolRequestReplicator {
     @Test(timeout = 15000)
     public void testOneNodeRejectsTwoPhaseCommit() {
         final Set<NodeIdentifier> nodeIds = new HashSet<>();
-        nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 
8101, "localhost", 8102, false));
-        nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 
8201, "localhost", 8202, false));
+        nodeIds.add(new NodeIdentifier("1", "localhost", 8100, "localhost", 
8101, "localhost", 8102, 8103, false));
+        nodeIds.add(new NodeIdentifier("2", "localhost", 8200, "localhost", 
8201, "localhost", 8202, 8203, false));
 
         final ClusterCoordinator coordinator = createClusterCoordinator();
         final AtomicInteger requestCount = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 994370c..de2d810 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -450,8 +450,8 @@ public class TestNodeClusterCoordinator {
 
     @Test
     public void testProposedIdentifierResolvedIfConflict() {
-        final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, false);
-        final NodeIdentifier conflictingId = new NodeIdentifier("1234", 
"localhost", 8001, "localhost", 9000, "localhost", 10000, false);
+        final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, 11000, false);
+        final NodeIdentifier conflictingId = new NodeIdentifier("1234", 
"localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
 
         final ConnectionRequest connectionRequest = new ConnectionRequest(id1);
         final ConnectionRequestMessage crm = new ConnectionRequestMessage();
@@ -484,7 +484,7 @@ public class TestNodeClusterCoordinator {
 
 
     private NodeIdentifier createNodeId(final int index) {
-        return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + 
index, "localhost", 9000 + index, "localhost", 10000 + index, false);
+        return new NodeIdentifier(String.valueOf(index), "localhost", 8000 + 
index, "localhost", 9000 + index, "localhost", 10000 + index, 11000 + index, 
false);
     }
 
     private ProtocolMessage requestConnection(final NodeIdentifier 
requestedNodeId, final NodeClusterCoordinator coordinator) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 22ccb43..ff96fba 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -21,6 +21,7 @@ import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 
 import java.net.URI;
 import java.util.Date;
@@ -158,6 +159,26 @@ public interface RemoteProcessGroup extends Authorizable, 
Positionable {
      */
     EventReporter getEventReporter();
 
+    SiteToSiteTransportProtocol getTransportProtocol();
+
+    void setTransportProtocol(SiteToSiteTransportProtocol transportProtocol);
+
+    String getProxyHost();
+
+    void setProxyHost(String proxyHost);
+
+    Integer getProxyPort();
+
+    void setProxyPort(Integer proxyPort);
+
+    String getProxyUser();
+
+    void setProxyUser(String proxyUser);
+
+    String getProxyPassword();
+
+    void setProxyPassword(String proxyPassword);
+
     /**
      * Initiates a task in the remote process group to re-initialize, as a
      * result of clustering changes

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
index b60e789..43363c0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java
@@ -16,15 +16,14 @@
  */
 package org.apache.nifi.remote;
 
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.NotAuthorizedException;
 import org.apache.nifi.remote.exception.RequestExpiredException;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 
+import java.util.Set;
+
 public interface RootGroupPort extends Port {
 
     boolean isTransmitting();
@@ -52,20 +51,18 @@ public interface RootGroupPort extends Port {
      *
      * @param peer peer
      * @param serverProtocol protocol
-     * @param requestHeaders headers
      *
      * @return the number of FlowFiles received
      * @throws org.apache.nifi.remote.exception.NotAuthorizedException nae
      * @throws org.apache.nifi.remote.exception.BadRequestException bre
      * @throws org.apache.nifi.remote.exception.RequestExpiredException ree
      */
-    int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, 
String> requestHeaders) throws NotAuthorizedException, BadRequestException, 
RequestExpiredException;
+    int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol) throws 
NotAuthorizedException, BadRequestException, RequestExpiredException;
 
     /**
      * Transfers data to the given stream
      *
      * @param peer peer
-     * @param requestHeaders headers
      * @param serverProtocol protocol
      *
      * @return the number of FlowFiles transferred
@@ -73,6 +70,6 @@ public interface RootGroupPort extends Port {
      * @throws org.apache.nifi.remote.exception.BadRequestException bre
      * @throws org.apache.nifi.remote.exception.RequestExpiredException ree
      */
-    int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, 
Map<String, String> requestHeaders) throws NotAuthorizedException, 
BadRequestException, RequestExpiredException;
+    int transferFlowFiles(Peer peer, ServerProtocol serverProtocol) throws 
NotAuthorizedException, BadRequestException, RequestExpiredException;
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index f638820..54beeb0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -185,6 +185,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RemoteResourceManager;
 import org.apache.nifi.remote.RemoteSiteListener;
@@ -194,6 +195,7 @@ import org.apache.nifi.remote.StandardRemoteProcessGroup;
 import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.StandardRootGroupPort;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -268,7 +270,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final ExtensionManager extensionManager;
     private final NiFiProperties properties;
     private final SSLContext sslContext;
-    private final RemoteSiteListener externalSiteListener;
+    private final Set<RemoteSiteListener> externalSiteListeners = new 
HashSet<>();
     private final AtomicReference<CounterRepository> counterRepositoryRef;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final StandardControllerServiceProvider controllerServiceProvider;
@@ -289,8 +291,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false);
 
     private final Integer remoteInputSocketPort;
+    private final Integer remoteInputHttpPort;
     private final Boolean isSiteToSiteSecure;
     private Integer clusterManagerRemoteSitePort = null;
+    private Integer clusterManagerRemoteSiteHttpPort = null;
     private Boolean clusterManagerRemoteSiteCommsSecure = null;
 
     private ProcessGroup rootGroup;
@@ -398,7 +402,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             bulletinRepo,
             heartbeatMonitor);
 
-        
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), 
properties.isSiteToSiteSecure());
+        
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), 
properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
 
         return flowController;
     }
@@ -484,6 +488,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         gracefulShutdownSeconds = shutdownSecs;
 
         remoteInputSocketPort = properties.getRemoteInputPort();
+        remoteInputHttpPort = properties.getRemoteInputHttpPort();
         isSiteToSiteSecure = properties.isSiteToSiteSecure();
 
         if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort 
!= null) {
@@ -503,17 +508,24 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         controllerServiceProvider.setRootProcessGroup(rootGroup);
 
         if (remoteInputSocketPort == null) {
-            LOG.info("Not enabling Site-to-Site functionality because 
nifi.remote.input.socket.port is not set");
-            externalSiteListener = null;
+            LOG.info("Not enabling RAW Socket Site-to-Site functionality 
because nifi.remote.input.socket.port is not set");
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because 
not all required Keystore/Truststore "
                 + "Properties are set. Site-to-Site functionality will be 
disabled until this problem is has been fixed.");
-            externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate 
resource for site-to-site Server Protocol
             
RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME,
 SocketFlowFileServerProtocol.class);
-            externalSiteListener = new 
SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext 
: null);
-            externalSiteListener.setRootGroup(rootGroup);
+            externalSiteListeners.add(new 
SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext 
: null));
+        }
+
+        if (remoteInputHttpPort == null) {
+            LOG.info("Not enabling HTTP(S) Site-to-Site functionality because 
nifi.remote.input.html.enabled is not true");
+        } else {
+            externalSiteListeners.add(HttpRemoteSiteListener.getInstance());
+        }
+
+        for(RemoteSiteListener listener : externalSiteListeners) {
+            listener.setRootGroup(rootGroup);
         }
 
         // Determine frequency for obtaining component status snapshots
@@ -650,8 +662,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // ContentRepository to purge superfluous files
             contentRepository.cleanup();
 
-            if (externalSiteListener != null) {
-                externalSiteListener.start();
+            for(RemoteSiteListener listener : externalSiteListeners) {
+                listener.start();
             }
 
             notifyComponentsConfigurationRestored();
@@ -1288,8 +1300,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     + "will take an indeterminate amount of time to stop.  
Might need to kill the program manually.");
             }
 
-            if (externalSiteListener != null) {
-                externalSiteListener.stop();
+            for(RemoteSiteListener listener : externalSiteListeners) {
+                listener.stop();
             }
 
             if (processScheduler != null) {
@@ -1433,8 +1445,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         try {
             rootGroup = group;
 
-            if (externalSiteListener != null) {
-                externalSiteListener.setRootGroup(group);
+            for(RemoteSiteListener listener : externalSiteListeners) {
+                listener.setRootGroup(rootGroup);
             }
 
             controllerServiceProvider.setRootProcessGroup(rootGroup);
@@ -1661,6 +1673,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
                 
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
                 
remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
+                
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol()));
+                remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost());
+                remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort());
+                remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser());
+                
remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword());
                 remoteGroup.setProcessGroup(group);
 
                 // set the input/output ports
@@ -3778,10 +3795,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return new ArrayList<>(history.getActions());
     }
 
-    public void setClusterManagerRemoteSiteInfo(final Integer 
managerListeningPort, final Boolean commsSecure) {
+    public void setClusterManagerRemoteSiteInfo(final Integer 
managerListeningPort, final Integer managerListeningHttpPort, final Boolean 
commsSecure) {
         writeLock.lock();
         try {
             clusterManagerRemoteSitePort = managerListeningPort;
+            clusterManagerRemoteSiteHttpPort = managerListeningHttpPort;
             clusterManagerRemoteSiteCommsSecure = commsSecure;
         } finally {
             writeLock.unlock();
@@ -3797,6 +3815,16 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+
+    public Integer getClusterManagerRemoteSiteListeningHttpPort() {
+        readLock.lock();
+        try {
+            return clusterManagerRemoteSiteHttpPort;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
     public Boolean isClusterManagerRemoteSiteCommsSecure() {
         readLock.lock();
         try {
@@ -3810,6 +3838,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return remoteInputSocketPort;
     }
 
+    public Integer getRemoteSiteListeningHttpPort() {
+        return remoteInputHttpPort;
+    }
+
     public Boolean isRemoteSiteCommsSecure() {
         return isSiteToSiteSecure;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2f16f7f..922bff0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -205,7 +205,8 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             this.nodeId = new NodeIdentifier(nodeUuid,
                 nodeApiAddress.getHostName(), nodeApiAddress.getPort(),
                 nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(),
-                properties.getRemoteInputHost(), 
properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
+                properties.getRemoteInputHost(), 
properties.getRemoteInputPort(),
+                properties.getRemoteInputHttpPort(), 
properties.isSiteToSiteSecure());
 
         } else {
             this.configuredForClustering = false;
@@ -471,7 +472,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                     controller.setClustered(true, null);
                     clusterCoordinator.setConnected(false);
 
-                    controller.setClusterManagerRemoteSiteInfo(null, null);
+                    controller.setClusterManagerRemoteSiteInfo(null, null, 
null);
                     controller.setConnectionStatus(new 
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
 
                     /*
@@ -585,8 +586,9 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // reconnect
             final ConnectionResponse connectionResponse = new 
ConnectionResponse(getNodeId(), request.getDataFlow(),
-                request.getManagerRemoteSiteListeningPort(), 
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId(),
-                request.getNodeConnectionStatuses(), 
request.getComponentRevisions());
+                    request.getManagerRemoteSiteListeningPort(), 
request.getManagerRemoteSiteListeningHttpPort(),
+                    request.isManagerRemoteSiteCommsSecure(), 
request.getInstanceId(),
+                    request.getNodeConnectionStatuses(), 
request.getComponentRevisions());
 
             connectionResponse.setCoordinatorDN(request.getRequestorDN());
             loadFromConnectionResponse(connectionResponse);
@@ -848,7 +850,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // mark the node as clustered
             controller.setClustered(true, response.getInstanceId(), 
response.getCoordinatorDN());
-            
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(),
 response.isManagerRemoteCommsSecure());
+            
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(),
 response.getManagerRemoteInputHttpPort(), 
response.isManagerRemoteCommsSecure());
 
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 93b9761..6b938d6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -53,6 +53,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.reporting.Severity;
@@ -923,7 +924,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         // add remote process group
         final List<Element> remoteProcessGroupNodeList = 
getChildrenByTagName(processGroupElement, "remoteProcessGroup");
         for (final Element remoteProcessGroupElement : 
remoteProcessGroupNodeList) {
-            final RemoteProcessGroupDTO remoteGroupDto = 
FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement);
+            final RemoteProcessGroupDTO remoteGroupDto = 
FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
             final RemoteProcessGroup remoteGroup = 
controller.createRemoteProcessGroup(remoteGroupDto.getId(), 
remoteGroupDto.getTargetUri());
             remoteGroup.setComments(remoteGroupDto.getComments());
             remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition()));
@@ -938,6 +939,27 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                 
remoteGroup.setYieldDuration(remoteGroupDto.getYieldDuration());
             }
 
+            String transportProtocol = remoteGroupDto.getTransportProtocol();
+            if (transportProtocol != null && 
!transportProtocol.trim().isEmpty()) {
+                
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(transportProtocol.toUpperCase()));
+            }
+
+            if (remoteGroupDto.getProxyHost() != null) {
+                remoteGroup.setProxyHost(remoteGroupDto.getProxyHost());
+            }
+
+            if (remoteGroupDto.getProxyPort() != null) {
+                remoteGroup.setProxyPort(remoteGroupDto.getProxyPort());
+            }
+
+            if (remoteGroupDto.getProxyUser() != null) {
+                remoteGroup.setProxyUser(remoteGroupDto.getProxyUser());
+            }
+
+            if (remoteGroupDto.getProxyPassword() != null) {
+                
remoteGroup.setProxyPassword(remoteGroupDto.getProxyPassword());
+            }
+
             final Set<RemoteProcessGroupPortDescriptor> inputPorts = new 
HashSet<>();
             for (final Element portElement : 
getChildrenByTagName(remoteProcessGroupElement, "inputPort")) {
                 
inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
index bcf692d..b7a55ad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateUtils.java
@@ -247,6 +247,7 @@ public class TemplateUtils {
             remoteProcessGroupDTO.setInputPortCount(null);
             remoteProcessGroupDTO.setOutputPortCount(null);
             remoteProcessGroupDTO.setTransmitting(null);
+            remoteProcessGroupDTO.setProxyPassword(null);
 
             // if this remote process group has contents
             if (remoteProcessGroupDTO.getContents() != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index f6de870..ef910ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -175,7 +175,7 @@ public class FlowFromDOMFactory {
 
         nodeList = DomUtils.getChildNodesByTagName(element, 
"remoteProcessGroup");
         for (int i = 0; i < nodeList.getLength(); i++) {
-            remoteProcessGroups.add(getRemoteProcessGroup((Element) 
nodeList.item(i)));
+            remoteProcessGroups.add(getRemoteProcessGroup((Element) 
nodeList.item(i), encryptor));
         }
 
         nodeList = DomUtils.getChildNodesByTagName(element, "connection");
@@ -246,7 +246,7 @@ public class FlowFromDOMFactory {
         return dto;
     }
 
-    public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element 
element) {
+    public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element 
element, final StringEncryptor encryptor) {
         final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
         dto.setId(getString(element, "id"));
         dto.setName(getString(element, "name"));
@@ -255,6 +255,13 @@ public class FlowFromDOMFactory {
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
         dto.setCommunicationsTimeout(getString(element, "timeout"));
         dto.setComments(getString(element, "comment"));
+        dto.setYieldDuration(getString(element, "yieldPeriod"));
+        dto.setTransportProtocol(getString(element, "transportProtocol"));
+        dto.setProxyHost(getString(element, "proxyHost"));
+        dto.setProxyPort(getOptionalInt(element, "proxyPort"));
+        dto.setProxyUser(getString(element, "proxyUser"));
+        String proxyPassword = decrypt(getString(element, "proxyPassword"), 
encryptor);
+        dto.setProxyPassword(proxyPassword);
 
         return dto;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index e26f550..e774637 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -56,6 +56,7 @@ import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.util.StringUtils;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
@@ -244,6 +245,16 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "timeout", 
remoteRef.getCommunicationsTimeout());
         addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
         addTextElement(element, "transmitting", 
String.valueOf(remoteRef.isTransmitting()));
+        addTextElement(element, "transportProtocol", 
remoteRef.getTransportProtocol().name());
+        addTextElement(element, "proxyHost", remoteRef.getProxyHost());
+        if (remoteRef.getProxyPort() != null) {
+            addTextElement(element, "proxyPort", remoteRef.getProxyPort());
+        }
+        addTextElement(element, "proxyUser", remoteRef.getProxyUser());
+        if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) {
+            String value = ENC_PREFIX + 
encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX;
+            addTextElement(element, "proxyPassword", value);
+        }
 
         for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
             if (port.hasIncomingConnection()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index cdabeca..fb9da32 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.remote;
 
 import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.ClientResponse.Status;
 import com.sun.jersey.api.client.UniformInterfaceException;
 import org.apache.nifi.authorization.Resource;
@@ -39,6 +38,9 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
@@ -46,13 +48,12 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.api.dto.ControllerDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.Response;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -82,13 +83,9 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteProcessGroup.class);
 
-    public static final String SITE_TO_SITE_URI_PATH = "/site-to-site";
-    public static final String ROOT_GROUP_STATUS_URI_PATH = 
"/flow/process-groups/root/status";
-
     // status codes
-    public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
-    public static final int UNAUTHORIZED_STATUS_CODE = 
Status.UNAUTHORIZED.getStatusCode();
-    public static final int FORBIDDEN_STATUS_CODE = 
Status.FORBIDDEN.getStatusCode();
+    private static final int UNAUTHORIZED_STATUS_CODE = 
Status.UNAUTHORIZED.getStatusCode();
+    private static final int FORBIDDEN_STATUS_CODE = 
Status.FORBIDDEN.getStatusCode();
 
     private final String id;
 
@@ -112,6 +109,12 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     private volatile String communicationsTimeout = "30 sec";
     private volatile String targetId;
     private volatile String yieldDuration = "10 sec";
+    private volatile SiteToSiteTransportProtocol transportProtocol = 
SiteToSiteTransportProtocol.RAW;
+    private volatile String proxyHost;
+    private volatile Integer proxyPort;
+    private volatile String proxyUser;
+    private volatile String proxyPassword;
+
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -127,6 +130,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     private Long refreshContentsTimestamp = null;
     private Boolean destinationSecure;
     private Integer listeningPort;
+    private Integer listeningHttpPort;
 
     private volatile String authorizationIssue;
 
@@ -235,6 +239,56 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         this.targetId = targetId;
     }
 
+    @Override
+    public void setTransportProtocol(final SiteToSiteTransportProtocol 
transportProtocol) {
+        this.transportProtocol = transportProtocol;
+    }
+
+    @Override
+    public SiteToSiteTransportProtocol getTransportProtocol() {
+        return transportProtocol;
+    }
+
+    @Override
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    @Override
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    @Override
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    @Override
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
+
+    @Override
+    public String getProxyUser() {
+        return proxyUser;
+    }
+
+    @Override
+    public void setProxyUser(String proxyUser) {
+        this.proxyUser = proxyUser;
+    }
+
+    @Override
+    public String getProxyPassword() {
+        return proxyPassword;
+    }
+
+    @Override
+    public void setProxyPassword(String proxyPassword) {
+        this.proxyPassword = proxyPassword;
+    }
+
     /**
      * @return the ID of the Root Group on the remote instance
      */
@@ -696,10 +750,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         return parent == null ? context : getRootGroup(parent);
     }
 
-    private boolean isWebApiSecure() {
-        return targetUri.toString().toLowerCase().startsWith("https");
-    }
-
     private void refreshFlowContentsFromLocal() {
         final ProcessGroup rootGroup = getRootGroup();
         setName(rootGroup.getName());
@@ -725,6 +775,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             final NiFiProperties props = NiFiProperties.getInstance();
             this.destinationSecure = props.isSiteToSiteSecure();
             this.listeningPort = props.getRemoteInputPort();
+            this.listeningHttpPort = props.getRemoteInputHttpPort();
 
             refreshContentsTimestamp = System.currentTimeMillis();
         } finally {
@@ -760,20 +811,14 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             return;
         }
 
-        final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? 
sslContext : null);
-        final String uriVal = apiUri.toString() + SITE_TO_SITE_URI_PATH;
-        URI uri;
-        try {
-            uri = new URI(uriVal);
-        } catch (final URISyntaxException e) {
-            throw new CommunicationsException("Invalid URI: " + uriVal);
-        }
-
         try {
             // perform the request
-            final ClientResponse response = utils.get(uri, 
getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-
-            if 
(!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily()))
 {
+            final ControllerDTO dto;
+            try (
+                final SiteToSiteRestApiClient apiClient = 
getSiteToSiteRestApiClient();
+            ){
+                dto = apiClient.getController();
+            } catch (IOException e) {
                 writeLock.lock();
                 try {
                     for (final Iterator<StandardRemoteGroupPort> iter = 
inputPorts.values().iterator(); iter.hasNext();) {
@@ -793,15 +838,9 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     writeLock.unlock();
                 }
 
-                // consume the entity entirely
-                response.getEntity(String.class);
-                throw new CommunicationsException("Unable to communicate with 
Remote NiFi at URI " + uriVal + ". Got HTTP Error Code "
-                        + response.getStatus() + ": " + 
response.getStatusInfo().getReasonPhrase());
+                throw new CommunicationsException("Unable to communicate with 
Remote NiFi at URI " + apiUri + " due to: " + e.getMessage());
             }
 
-            final ControllerEntity entity = 
response.getEntity(ControllerEntity.class);
-            final ControllerDTO dto = entity.getController();
-
             writeLock.lock();
             try {
                 if (dto.getInputPorts() != null) {
@@ -853,6 +892,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                 }
 
                 this.listeningPort = dto.getRemoteSiteListeningPort();
+                this.listeningHttpPort = dto.getRemoteSiteHttpListeningPort();
                 this.destinationSecure = dto.isSiteToSiteSecure();
 
                 final ProcessGroupCounts newCounts = new 
ProcessGroupCounts(inputPortCount, outputPortCount,
@@ -867,6 +907,14 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         }
     }
 
+    private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
+        SiteToSiteRestApiClient apiClient = new 
SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, 
proxyUser, proxyPassword));
+        apiClient.setBaseUrl(apiUri.toString());
+        
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+        
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+        return apiClient;
+    }
+
     /**
      * Converts a set of ports into a set of remote process group ports.
      *
@@ -1075,7 +1123,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     public boolean isSiteToSiteEnabled() {
         readLock.lock();
         try {
-            return this.listeningPort != null;
+            return (this.listeningPort != null || this.listeningHttpPort != 
null);
         } finally {
             readLock.unlock();
         }
@@ -1090,18 +1138,16 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
         @Override
         public void run() {
-            try {
-                final RemoteNiFiUtils utils = new 
RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
-                final ClientResponse response = utils.get(new URI(apiUri + 
SITE_TO_SITE_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-
-                final int statusCode = response.getStatus();
-
-                if (statusCode == OK_STATUS_CODE) {
-                    final ControllerEntity entity = 
response.getEntity(ControllerEntity.class);
-                    final ControllerDTO dto = entity.getController();
+            try (
+                final SiteToSiteRestApiClient apiClient = 
getSiteToSiteRestApiClient();
+            ){
+                try {
+                    final ControllerDTO dto = apiClient.getController();
 
-                    if (dto.getRemoteSiteListeningPort() == null) {
-                        authorizationIssue = "Remote instance is not 
configured to allow Site-to-Site communications at this time.";
+                    if (dto.getRemoteSiteListeningPort() == null && 
SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
+                        authorizationIssue = "Remote instance is not 
configured to allow RAW Site-to-Site communications at this time.";
+                    } else if (dto.getRemoteSiteHttpListeningPort() == null && 
SiteToSiteTransportProtocol.HTTP.equals(transportProtocol)) {
+                        authorizationIssue = "Remote instance is not 
configured to allow HTTP Site-to-Site communications at this time.";
                     } else {
                         authorizationIssue = null;
                     }
@@ -1109,6 +1155,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     writeLock.lock();
                     try {
                         listeningPort = dto.getRemoteSiteListeningPort();
+                        listeningHttpPort = 
dto.getRemoteSiteHttpListeningPort();
                         destinationSecure = dto.isSiteToSiteSecure();
                     } finally {
                         writeLock.unlock();
@@ -1117,31 +1164,39 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
                     final String remoteInstanceId = dto.getInstanceId();
                     final boolean isPointingToCluster = 
flowController.getInstanceId().equals(remoteInstanceId);
                     pointsToCluster.set(isPointingToCluster);
-                } else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
-                    try {
-                        final ClientResponse requestAccountResponse = 
utils.issueRegistrationRequest(apiUri.toString());
-                        if 
(Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()))
 {
-                            logger.info("{} Issued a Request to communicate 
with remote instance", this);
-                        } else {
-                            logger.error("{} Failed to request account: got 
unexpected response code of {}:{}", new Object[]{
-                                this, requestAccountResponse.getStatus(), 
requestAccountResponse.getStatusInfo().getReasonPhrase()});
-                        }
-                    } catch (final Exception e) {
-                        logger.error("{} Failed to request account due to {}", 
this, e.toString());
-                        if (logger.isDebugEnabled()) {
-                            logger.error("", e);
+
+                } catch (SiteToSiteRestApiClient.HttpGetFailedException e) {
+
+                    if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) {
+                        // TODO: implement registration request
+                        /*
+                        try {
+                            final ClientResponse requestAccountResponse = 
utils.issueRegistrationRequest(apiUri.toString());
+                            if 
(Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()))
 {
+                                logger.info("{} Issued a Request to 
communicate with remote instance", this);
+                            } else {
+                                logger.error("{} Failed to request account: 
got unexpected response code of {}:{}", new Object[]{
+                                    this, requestAccountResponse.getStatus(), 
requestAccountResponse.getStatusInfo().getReasonPhrase()});
+                            }
+                        } catch (final Exception e) {
+                            logger.error("{} Failed to request account due to 
{}", this, e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
                         }
-                    }
+                        */
+                        authorizationIssue = e.getDescription();
 
-                    authorizationIssue = response.getEntity(String.class);
-                } else if (statusCode == FORBIDDEN_STATUS_CODE) {
-                    authorizationIssue = response.getEntity(String.class);
-                } else {
-                    final String message = response.getEntity(String.class);
-                    logger.warn("{} When communicating with remote instance, 
got unexpected response code {}:{} with entity: {}",
-                            new Object[]{this, response.getStatus(), 
response.getStatusInfo().getReasonPhrase(), message});
-                    authorizationIssue = "Unable to determine Site-to-Site 
availability.";
+                    } else if (e.getResponseCode() == FORBIDDEN_STATUS_CODE) {
+                        authorizationIssue = e.getDescription();
+                    } else {
+                        final String message = e.getDescription();
+                        logger.warn("{} When communicating with remote 
instance, got unexpected result. {}",
+                                new Object[]{this, e.getMessage()});
+                        authorizationIssue = "Unable to determine Site-to-Site 
availability.";
+                  }
                 }
+
             } catch (final Exception e) {
                 logger.warn(String.format("Unable to connect to %s due to %s", 
StandardRemoteProcessGroup.this, e));
                 getEventReporter().reportEvent(Severity.WARNING, "Site to 
Site", String.format("Unable to connect to %s due to %s",

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 0017630..dfbc6fd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -111,9 +111,11 @@ 
nifi.components.status.repository.buffer.size=${nifi.components.status.repositor
 
nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
 # Site to Site properties
-nifi.remote.input.socket.host=
+nifi.remote.input.host=
+nifi.remote.input.secure=false
 nifi.remote.input.socket.port=
-nifi.remote.input.secure=true
+nifi.remote.input.http.enabled=true
+nifi.remote.input.http.transaction.ttl=30 sec
 
 # web properties #
 nifi.web.war.directory=${nifi.web.war.directory}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
new file mode 100644
index 0000000..acf7fc5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.util.NiFiProperties.DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL;
+import static 
org.apache.nifi.util.NiFiProperties.SITE_TO_SITE_HTTP_TRANSACTION_TTL;
+
+public class HttpRemoteSiteListener implements RemoteSiteListener {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpRemoteSiteListener.class);
+    private final int transactionTtlSec;
+    private static HttpRemoteSiteListener instance;
+
+    private final Map<String, TransactionWrapper> transactions = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService taskExecutor;
+    private final int httpListenPort;
+    private ProcessGroup rootGroup;
+    private ScheduledFuture<?> transactionMaintenanceTask;
+
+    private HttpRemoteSiteListener() {
+        super();
+        taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() 
{
+            private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = defaultFactory.newThread(r);
+                thread.setName("Http Site-to-Site Transaction Maintenance");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+
+        NiFiProperties properties = NiFiProperties.getInstance();
+        int txTtlSec;
+        try {
+            final String snapshotFrequency = 
properties.getProperty(SITE_TO_SITE_HTTP_TRANSACTION_TTL, 
DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL);
+            txTtlSec = (int) FormatUtils.getTimeDuration(snapshotFrequency, 
TimeUnit.SECONDS);
+        } catch (final Exception e) {
+            txTtlSec = (int) 
FormatUtils.getTimeDuration(DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL, 
TimeUnit.SECONDS);
+            logger.warn("Failed to parse {} due to {}, use default as {} 
secs.",
+                    SITE_TO_SITE_HTTP_TRANSACTION_TTL, e.getMessage(), 
txTtlSec);
+        }
+        transactionTtlSec = txTtlSec;
+
+        httpListenPort = properties.getRemoteInputHttpPort() != null ? 
properties.getRemoteInputHttpPort() : 0;
+
+    }
+
+    public static HttpRemoteSiteListener getInstance() {
+        if (instance == null) {
+            synchronized (HttpRemoteSiteListener.class) {
+                if (instance == null) {
+                    instance = new HttpRemoteSiteListener();
+                }
+            }
+        }
+        return instance;
+    }
+
+    private class TransactionWrapper {
+        private final FlowFileTransaction transaction;
+        private long lastCommunicationAt;
+
+        private TransactionWrapper(final FlowFileTransaction transaction) {
+            this.transaction = transaction;
+            this.lastCommunicationAt = System.currentTimeMillis();
+        }
+
+        private boolean isExpired() {
+            long elapsedMillis = System.currentTimeMillis() - 
lastCommunicationAt;
+            long elapsedSec = TimeUnit.SECONDS.convert(elapsedMillis, 
TimeUnit.MILLISECONDS);
+            return elapsedSec > transactionTtlSec;
+        }
+
+        private void extend() {
+            lastCommunicationAt = System.currentTimeMillis();
+        }
+    }
+
+    @Override
+    public void setRootGroup(ProcessGroup rootGroup) {
+        this.rootGroup = rootGroup;
+    }
+
+    public void setupServerProtocol(HttpFlowFileServerProtocol serverProtocol) 
{
+        serverProtocol.setRootProcessGroup(rootGroup);
+    }
+
+    @Override
+    public void start() throws IOException {
+        transactionMaintenanceTask = taskExecutor.scheduleWithFixedDelay(() -> 
{
+
+            int originalSize = transactions.size();
+            logger.trace("Transaction maintenance task started.");
+            try {
+                Set<String> transactionIds = 
transactions.keySet().stream().collect(Collectors.toSet());
+                transactionIds.stream().filter(tid -> 
!isTransactionActive(tid))
+                    .forEach(tid -> {
+                        cancelTransaction(tid);
+                    });
+            } catch (Exception e) {
+                // Swallow exception so that this thread can keep working.
+                logger.error("An exception occurred while maintaining 
transactions", e);
+            }
+            logger.debug("Transaction maintenance task finished. 
originalSize={}, currentSize={}", originalSize, transactions.size());
+
+        }, 0, transactionTtlSec / 2, TimeUnit.SECONDS);
+    }
+
+    public void cancelTransaction(String transactionId) {
+        TransactionWrapper wrapper = transactions.remove(transactionId);
+        if (wrapper == null) {
+            logger.debug("The transaction was not found. transactionId={}", 
transactionId);
+        } else {
+            logger.debug("Cancel a transaction. transactionId={}", 
transactionId);
+            FlowFileTransaction t = wrapper.transaction;
+            if(t != null && t.getSession() != null){
+                logger.info("Cancel a transaction, rollback its session. 
transactionId={}", transactionId);
+                try {
+                    t.getSession().rollback();
+                } catch (Exception e) {
+                    // Swallow exception so that it can keep expiring other 
transactions.
+                    logger.error("Failed to rollback. transactionId={}", 
transactionId, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public int getPort() {
+        return httpListenPort;
+    }
+
+    @Override
+    public void stop() {
+        if(transactionMaintenanceTask != null) {
+            logger.debug("Stopping transactionMaintenanceTask...");
+            transactionMaintenanceTask.cancel(true);
+        }
+    }
+
+    public String createTransaction() {
+        final String transactionId = UUID.randomUUID().toString();
+        transactions.put(transactionId, new TransactionWrapper(null));
+        logger.debug("Created a new transaction: {}", transactionId);
+        return transactionId;
+    }
+
+    public boolean isTransactionActive(final String transactionId) {
+        TransactionWrapper transaction = transactions.get(transactionId);
+        if (transaction == null) {
+            return false;
+        }
+        if (transaction.isExpired()) {
+            return false;
+        }
+        return true;
+    }
+
+    public void holdTransaction(final String transactionId, final 
FlowFileTransaction transaction) throws IllegalStateException {
+        // We don't check expiration of the transaction here, to support large 
file transport or slow network.
+        // The availability of current transaction is already checked when the 
HTTP request was received at SiteToSiteResource.
+        TransactionWrapper currentTransaction = 
transactions.remove(transactionId);
+        if (currentTransaction == null) {
+            logger.debug("The transaction was not found, it looks it took 
longer than transaction TTL.");
+        } else if (currentTransaction.transaction != null) {
+            throw new IllegalStateException("Transaction has already been 
processed. It can only be finalized. transactionId=" + transactionId);
+        }
+        if (transaction.getSession() == null) {
+            throw new IllegalStateException("Passed transaction is not 
associated any session yet, can not hold. transactionId=" + transactionId);
+        }
+        logger.debug("Holding a transaction: {}", transactionId);
+        // Server has received or sent all data, and transaction TTL count 
down starts here.
+        // However, if the client doesn't consume data fast enough, server 
might expire and rollback the transaction.
+        transactions.put(transactionId, new TransactionWrapper(transaction));
+    }
+
+    public FlowFileTransaction finalizeTransaction(final String transactionId) 
throws IllegalStateException {
+        if (!isTransactionActive(transactionId)){
+            throw new IllegalStateException("Transaction was not found or not 
active anymore. transactionId=" + transactionId);
+        }
+        TransactionWrapper transaction = transactions.remove(transactionId);
+        if (transaction == null) {
+            throw new IllegalStateException("Transaction was not found 
anymore. It's already finalized or expired. transactionId=" + transactionId);
+        }
+        if (transaction.transaction == null) {
+            throw new IllegalStateException("Transaction has not started 
yet.");
+        }
+        logger.debug("Finalized a transaction: {}", transactionId);
+        return transaction.transaction;
+    }
+
+    public void extendsTransaction(final String transactionId) throws 
IllegalStateException {
+        if (!isTransactionActive(transactionId)){
+            throw new IllegalStateException("Transaction was not found or not 
active anymore. transactionId=" + transactionId);
+        }
+        TransactionWrapper transaction = transactions.get(transactionId);
+        if (transaction != null) {
+            logger.debug("Extending transaction TTL, transactionId={}", 
transactionId);
+            transaction.extend();
+        }
+    }
+
+    public int getTransactionTtlSec() {
+        return transactionTtlSec;
+    }
+
+}

Reply via email to