NIFI-1857: HTTPS Site-to-Site

- Enable HTTP(S) for Site-to-Site communication
- Support HTTP Proxy in the middle of local and remote NiFi
- Support BASIC and DIGEST auth with Proxy Server
- Provide 2-phase style commit same as existing socket version
- [WIP] Test with the latest cluster env (without NCM) hasn't tested yet

- Fixed Buffer handling issues at asyc http client POST
- Fixed JS error when applying Remote Process Group Port setting from UI
- Use compression setting from UI
- Removed already finished TODO comments

- Added additional buffer draining code after receiving EOF
- Added inspection and assert code to make sure Site-to-Site client has
  written data fully to output
stream
- Changed default nifi.remote.input.secure from true to false

This closes #497.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c120c498
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c120c498
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c120c498

Branch: refs/heads/master
Commit: c120c4982d4fc811b06b672e3983b8ca5fb8ae64
Parents: a5fecda
Author: Koji Kawamura <[email protected]>
Authored: Mon Jun 6 22:19:26 2016 +0900
Committer: Mark Payne <[email protected]>
Committed: Thu Jun 9 15:09:57 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |   39 +-
 nifi-commons/nifi-site-to-site-client/pom.xml   |   19 +
 .../apache/nifi/remote/AbstractTransaction.java |  389 +++++++
 .../remote/ClientTransactionCompletion.java     |   57 +
 .../remote/client/AbstractSiteToSiteClient.java |   54 +
 .../apache/nifi/remote/client/PeerSelector.java |  341 ++++++
 .../nifi/remote/client/PeerStatusProvider.java  |   27 +
 .../nifi/remote/client/SiteInfoProvider.java    |  231 ++++
 .../nifi/remote/client/SiteToSiteClient.java    |   95 +-
 .../remote/client/SiteToSiteClientConfig.java   |   13 +
 .../nifi/remote/client/http/HttpClient.java     |  200 ++++
 .../TransportProtocolVersionNegotiator.java     |   42 +
 .../client/socket/EndpointConnectionPool.java   |  559 +---------
 .../nifi/remote/client/socket/SocketClient.java |   40 +-
 .../remote/cluster/AdaptedNodeInformation.java  |   11 +
 .../nifi/remote/cluster/NodeInformation.java    |   20 +-
 .../remote/cluster/NodeInformationAdapter.java  |    4 +-
 .../remote/exception/HandshakeException.java    |   15 +
 .../io/http/HttpCommunicationsSession.java      |   97 ++
 .../apache/nifi/remote/io/http/HttpInput.java   |   58 +
 .../apache/nifi/remote/io/http/HttpOutput.java  |   45 +
 .../http/HttpServerCommunicationsSession.java   |   72 ++
 .../nifi/remote/protocol/ClientProtocol.java    |   14 +-
 .../nifi/remote/protocol/HandshakeProperty.java |   59 +
 .../apache/nifi/remote/protocol/Response.java   |   52 +
 .../nifi/remote/protocol/ResponseCode.java      |  152 +++
 .../protocol/SiteToSiteTransportProtocol.java   |   22 +
 .../protocol/http/HttpClientTransaction.java    |  187 ++++
 .../nifi/remote/protocol/http/HttpHeaders.java  |   35 +
 .../nifi/remote/protocol/http/HttpProxy.java    |   59 +
 .../protocol/socket/HandshakeProperty.java      |   59 -
 .../nifi/remote/protocol/socket/Response.java   |   52 -
 .../remote/protocol/socket/ResponseCode.java    |  148 ---
 .../protocol/socket/SocketClientProtocol.java   |  173 +--
 .../socket/SocketClientTransaction.java         |  345 +-----
 .../SocketClientTransactionCompletion.java      |   57 -
 .../nifi/remote/util/EventReportUtil.java       |   50 +
 .../nifi/remote/util/NiFiRestApiUtil.java       |  100 --
 .../remote/util/SiteToSiteRestApiClient.java    |  992 +++++++++++++++++
 .../nifi/remote/client/TestPeerSelector.java    |  125 +++
 .../nifi/remote/client/http/TestHttpClient.java |  950 ++++++++++++++++
 .../socket/TestEndpointConnectionStatePool.java |   92 --
 .../remote/protocol/SiteToSiteTestUtils.java    |  237 ++++
 .../http/TestHttpClientTransaction.java         |  346 ++++++
 .../socket/TestSocketClientTransaction.java     |  334 ++++++
 .../src/main/asciidoc/administration-guide.adoc |   12 +-
 .../src/main/asciidoc/getting-started.adoc      |    1 +
 .../images/configure-remote-process-group.png   |  Bin 0 -> 36406 bytes
 nifi-docs/src/main/asciidoc/user-guide.adoc     |   37 +-
 .../apache/nifi/web/api/dto/ControllerDTO.java  |   19 +
 .../nifi/web/api/dto/RemoteProcessGroupDTO.java |   45 +
 .../apache/nifi/web/api/dto/remote/PeerDTO.java |   78 ++
 .../apache/nifi/web/api/entity/PeersEntity.java |   46 +
 .../web/api/entity/TransactionResultEntity.java |   53 +
 .../cluster/protocol/ConnectionResponse.java    |   10 +-
 .../nifi/cluster/protocol/NodeIdentifier.java   |   26 +-
 .../jaxb/message/AdaptedConnectionResponse.java |    9 +
 .../jaxb/message/AdaptedNodeIdentifier.java     |   11 +
 .../jaxb/message/ConnectionResponseAdapter.java |    3 +-
 .../jaxb/message/NodeIdentifierAdapter.java     |    3 +-
 .../message/ReconnectionRequestMessage.java     |    9 +
 .../jaxb/message/TestJaxbProtocolUtils.java     |    4 +-
 .../node/NodeClusterCoordinator.java            |    9 +-
 .../heartbeat/TestAbstractHeartbeatMonitor.java |    2 +-
 .../endpoints/TestProcessorEndpointMerger.java  |    4 +-
 .../http/replication/TestResponseUtils.java     |    8 +-
 .../TestThreadPoolRequestReplicator.java        |   24 +-
 .../node/TestNodeClusterCoordinator.java        |    6 +-
 .../apache/nifi/groups/RemoteProcessGroup.java  |   21 +
 .../org/apache/nifi/remote/RootGroupPort.java   |   11 +-
 .../apache/nifi/controller/FlowController.java  |   60 +-
 .../nifi/controller/StandardFlowService.java    |   12 +-
 .../controller/StandardFlowSynchronizer.java    |   24 +-
 .../apache/nifi/controller/TemplateUtils.java   |    1 +
 .../serialization/FlowFromDOMFactory.java       |   11 +-
 .../serialization/StandardFlowSerializer.java   |   11 +
 .../nifi/remote/StandardRemoteProcessGroup.java |  187 ++--
 .../src/main/resources/conf/nifi.properties     |    6 +-
 .../nifi/remote/HttpRemoteSiteListener.java     |  243 ++++
 .../nifi/remote/SocketRemoteSiteListener.java   |   32 +-
 .../nifi/remote/StandardRemoteGroupPort.java    |    4 +
 .../nifi/remote/StandardRootGroupPort.java      |    8 +-
 .../AbstractFlowFileServerProtocol.java         |  559 ++++++++++
 .../remote/protocol/FlowFileTransaction.java    |   71 ++
 .../remote/protocol/HandshakenProperties.java   |   96 ++
 .../http/HttpFlowFileServerProtocol.java        |   30 +
 .../http/HttpFlowFileServerProtocolImpl.java    |  223 ++++
 .../socket/ClusterManagerServerProtocol.java    |    2 +
 .../socket/SocketFlowFileServerProtocol.java    |  529 +--------
 .../nifi/remote/TestHttpRemoteSiteListener.java |  101 ++
 .../http/TestHttpFlowFileServerProtocol.java    |  589 ++++++++++
 .../nifi/web/StandardNiFiServiceFacade.java     |    2 +
 .../nifi/web/api/ApplicationResource.java       |    2 +-
 .../apache/nifi/web/api/SiteToSiteResource.java | 1037 +++++++++++++++++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |   22 +-
 .../nifi/web/controller/ControllerFacade.java   |   22 +
 .../dao/impl/StandardRemoteProcessGroupDAO.java |   85 +-
 .../nifi/web/api/TestSiteToSiteResource.java    |  516 +++++++++
 .../resources/access-control/nifi.properties    |    2 +-
 .../test/resources/site-to-site/nifi.properties |  174 +++
 .../canvas/new-remote-process-group-dialog.jsp  |   72 ++
 .../remote-process-group-configuration.jsp      |   69 +-
 .../canvas/remote-process-group-details.jsp     |   69 +-
 .../nifi-web-ui/src/main/webapp/css/dialog.css  |   17 +-
 .../css/remote-process-group-configuration.css  |   28 +-
 .../nf-ng-remote-process-group-component.js     |   71 +-
 .../nf-remote-process-group-configuration.js    |   33 +-
 .../canvas/nf-remote-process-group-details.js   |   10 +
 .../nf/canvas/nf-remote-process-group-ports.js  |    3 +-
 109 files changed, 10283 insertions(+), 2269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 8021cca..bb74b25 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -60,9 +60,11 @@ public class NiFiProperties extends Properties {
     public static final String SENSITIVE_PROPS_ALGORITHM = 
"nifi.sensitive.props.algorithm";
     public static final String SENSITIVE_PROPS_PROVIDER = 
"nifi.sensitive.props.provider";
     public static final String H2_URL_APPEND = "nifi.h2.url.append";
-    public static final String REMOTE_INPUT_HOST = 
"nifi.remote.input.socket.host";
+    public static final String REMOTE_INPUT_HOST = "nifi.remote.input.host";
     public static final String REMOTE_INPUT_PORT = 
"nifi.remote.input.socket.port";
     public static final String SITE_TO_SITE_SECURE = 
"nifi.remote.input.secure";
+    public static final String SITE_TO_SITE_HTTP_ENABLED = 
"nifi.remote.input.http.enabled";
+    public static final String SITE_TO_SITE_HTTP_TRANSACTION_TTL = 
"nifi.remote.input.http.transaction.ttl";
     public static final String TEMPLATE_DIRECTORY = "nifi.templates.directory";
     public static final String ADMINISTRATIVE_YIELD_DURATION = 
"nifi.administrative.yield.duration";
     public static final String PERSISTENT_STATE_DIRECTORY = 
"nifi.persistent.state.directory";
@@ -210,6 +212,7 @@ public class NiFiProperties extends Properties {
     public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
     public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
     public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
+    public static final String DEFAULT_SITE_TO_SITE_HTTP_TRANSACTION_TTL = "30 
secs";
 
     // cluster common defaults
     public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = 
"5 sec";
@@ -387,7 +390,7 @@ public class NiFiProperties extends Properties {
     /**
      * The socket port to listen on for a Remote Input Port.
      *
-     * @return the remote input port
+     * @return the remote input port for RAW socket communication
      */
     public Integer getRemoteInputPort() {
         return getPropertyAsPort(REMOTE_INPUT_PORT, DEFAULT_REMOTE_INPUT_PORT);
@@ -408,6 +411,38 @@ public class NiFiProperties extends Properties {
     }
 
     /**
+     * @return True if property value is 'true'; False otherwise.
+     */
+    public Boolean isSiteToSiteHttpEnabled() {
+        final String remoteInputHttpEnabled = 
getProperty(SITE_TO_SITE_HTTP_ENABLED, "false");
+
+        if ("true".equalsIgnoreCase(remoteInputHttpEnabled)) {
+            return true;
+        } else {
+            return false;
+        }
+
+    }
+
+    /**
+     * The HTTP or HTTPS Web API port for a Remote Input Port.
+     * @return the remote input port for HTTP(S) communication, or null if 
HTTP(S) Site-to-Site is not enabled
+     */
+    public Integer getRemoteInputHttpPort() {
+        if (!isSiteToSiteHttpEnabled()) {
+            return null;
+        }
+
+        String propertyKey = isSiteToSiteSecure() ? 
NiFiProperties.WEB_HTTPS_PORT : NiFiProperties.WEB_HTTP_PORT;
+        Integer port = getIntegerProperty(propertyKey, 0);
+        if (port == 0) {
+            throw new RuntimeException("Remote input HTTP" + 
(isSiteToSiteSecure() ? "S" : "")
+                    + " is enabled but " + propertyKey + " is not specified.");
+        }
+        return port;
+    }
+
+    /**
      * Returns the directory to which Templates are to be persisted
      *
      * @return the template directory

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml 
b/nifi-commons/nifi-site-to-site-client/pom.xml
index 987e42b..4bd0913 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -44,6 +44,15 @@
             <artifactId>nifi-client-dto</artifactId>
             <version>1.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+            <version>4.1.1</version>
+        </dependency>
 
         <dependency>
             <groupId>junit</groupId>
@@ -56,5 +65,15 @@
             <version>2.24.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
new file mode 100644
index 0000000..3e700aa
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+public abstract class AbstractTransaction implements Transaction {
+
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    protected final Peer peer;
+    protected final TransferDirection direction;
+    private final CRC32 crc = new CRC32();
+    private final boolean compress;
+    protected final FlowFileCodec codec;
+    protected final EventReporter eventReporter;
+    protected final int protocolVersion;
+    private final int penaltyMillis;
+    protected final String destinationId;
+    protected TransactionState state;
+    protected boolean dataAvailable = false;
+    private final long creationNanoTime = System.nanoTime();
+    private int transfers = 0;
+    private long contentBytes = 0;
+
+    public AbstractTransaction(final Peer peer, final TransferDirection 
direction, final boolean useCompression,
+                               final FlowFileCodec codec, final EventReporter 
eventReporter, final int protocolVersion,
+                               final int penaltyMillis, final String 
destinationId) {
+        this.peer = peer;
+        this.state = TransactionState.TRANSACTION_STARTED;
+        this.direction = direction;
+        this.compress = useCompression;
+        this.codec = codec;
+        this.eventReporter = eventReporter;
+        this.protocolVersion = protocolVersion;
+        this.penaltyMillis = penaltyMillis;
+        this.destinationId = destinationId;
+    }
+
+    protected void close() throws IOException {
+    }
+
+    @Override
+    public void send(final byte[] content, final Map<String, String> 
attributes) throws IOException {
+        send(new StandardDataPacket(attributes, new 
ByteArrayInputStream(content), content.length));
+    }
+
+    @Override
+    public void error() {
+        this.state = TransactionState.ERROR;
+        try {
+            close();
+        } catch (IOException e) {
+            logger.warn("Failed to close transaction due to {}", 
e.getMessage());
+            if (logger.isDebugEnabled()) {
+                logger.warn("", e);
+            }
+        }
+    }
+
+    @Override
+    public TransactionState getState() {
+        return state;
+    }
+
+    @Override
+    public Communicant getCommunicant() {
+        return peer;
+    }
+
+    @Override
+    public final DataPacket receive() throws IOException {
+        try {
+            try {
+                if (state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                    throw new IllegalStateException("Cannot receive data from 
" + peer + " because Transaction State is " + state);
+                }
+
+                if (direction == TransferDirection.SEND) {
+                    throw new IllegalStateException("Attempting to receive 
data from " + peer + " but started a SEND Transaction");
+                }
+
+                // if we already know there's no data, just return null
+                if (!dataAvailable) {
+                    return null;
+                }
+
+                // if we have already received a packet, check if another is 
available.
+                if (transfers > 0) {
+                    // Determine if Peer will send us data or has no data to 
send us
+                    final Response dataAvailableCode = 
readTransactionResponse();
+                    switch (dataAvailableCode.getCode()) {
+                        case CONTINUE_TRANSACTION:
+                            logger.debug("{} {} Indicates Transaction should 
continue", this, peer);
+                            this.dataAvailable = true;
+                            break;
+                        case FINISH_TRANSACTION:
+                            logger.debug("{} {} Indicates Transaction should 
finish", this, peer);
+                            this.dataAvailable = false;
+                            break;
+                        default:
+                            throw new ProtocolException("Got unexpected 
response from " + peer + " when asking for data: " + dataAvailableCode);
+                    }
+                }
+
+                // if no data available, return null
+                if (!dataAvailable) {
+                    return null;
+                }
+
+                logger.debug("{} Receiving data from {}", this, peer);
+                final InputStream is = 
peer.getCommunicationsSession().getInput().getInputStream();
+                final InputStream dataIn = compress ? new 
CompressionInputStream(is) : is;
+                final DataPacket packet = codec.decode(new 
CheckedInputStream(dataIn, crc));
+
+                if (packet == null) {
+                    this.dataAvailable = false;
+                } else {
+                    transfers++;
+                    contentBytes += packet.getSize();
+                }
+
+                this.state = TransactionState.DATA_EXCHANGED;
+                return packet;
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to receive data from " + peer + 
" due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    abstract protected Response readTransactionResponse() throws IOException;
+
+    protected final void writeTransactionResponse(ResponseCode response) 
throws IOException {
+        writeTransactionResponse(response, null);
+    }
+    abstract protected void writeTransactionResponse(ResponseCode response, 
String explanation) throws IOException;
+
+    @Override
+    public final void confirm() throws IOException {
+        try {
+            try {
+                if (state == TransactionState.TRANSACTION_STARTED && 
!dataAvailable && direction == TransferDirection.RECEIVE) {
+                    // client requested to receive data but no data available. 
no need to confirm.
+                    state = TransactionState.TRANSACTION_CONFIRMED;
+                    return;
+                }
+
+                if (state != TransactionState.DATA_EXCHANGED) {
+                    throw new IllegalStateException("Cannot confirm 
Transaction because state is " + state
+                            + "; Transaction can only be confirmed when state 
is " + TransactionState.DATA_EXCHANGED);
+                }
+
+                final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+                if (direction == TransferDirection.RECEIVE) {
+                    if (dataAvailable) {
+                        throw new IllegalStateException("Cannot complete 
transaction because the sender has already sent more data than client has 
consumed.");
+                    }
+
+                    // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+                    // to peer so that we can verify that the connection is 
still open. This is a two-phase commit,
+                    // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
+                    // session and then when we send the response back to the 
peer, the peer may have timed out and may not
+                    // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
+                    // Critical Section involved in this transaction so that 
rather than the Critical Section being the
+                    // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
+                    logger.trace("{} Sending CONFIRM_TRANSACTION Response Code 
to {}", this, peer);
+                    final String calculatedCRC = 
String.valueOf(crc.getValue());
+                    writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, 
calculatedCRC);
+
+                    final Response confirmTransactionResponse;
+                    try {
+                        confirmTransactionResponse = readTransactionResponse();
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to receive response code from {} 
when expecting confirmation of transaction", peer);
+                        if (eventReporter != null) {
+                            eventReporter.reportEvent(Severity.ERROR, 
"Site-to-Site", "Failed to receive response code from " + peer + " when 
expecting confirmation of transaction");
+                        }
+                        throw ioe;
+                    }
+
+                    logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
+
+                    switch (confirmTransactionResponse.getCode()) {
+                        case CONFIRM_TRANSACTION:
+                            break;
+                        case BAD_CHECKSUM:
+                            throw new IOException(this + " Received a 
BadChecksum response from peer " + peer);
+                        default:
+                            throw new ProtocolException(this + " Received 
unexpected Response from peer " + peer + " : "
+                                    + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
+                    }
+
+                    state = TransactionState.TRANSACTION_CONFIRMED;
+                } else {
+                    logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", 
this, peer);
+                    writeTransactionResponse(ResponseCode.FINISH_TRANSACTION);
+
+                    final String calculatedCRC = 
String.valueOf(crc.getValue());
+
+                    // we've sent a FINISH_TRANSACTION. Now we'll wait for the 
peer to send a 'Confirm Transaction' response
+                    final Response transactionConfirmationResponse = 
readTransactionResponse();
+                    if (transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION) {
+                        // Confirm checksum and echo back the confirmation.
+                        logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
+                        final String receivedCRC = 
transactionConfirmationResponse.getMessage();
+
+                        // CRC was not used before version 4
+                        if (protocolVersion > 3) {
+                            if (!receivedCRC.equals(calculatedCRC)) {
+                                
writeTransactionResponse(ResponseCode.BAD_CHECKSUM);
+                                throw new IOException(this + " Sent data to 
peer " + peer + " but calculated CRC32 Checksum as "
+                                        + calculatedCRC + " while peer 
calculated CRC32 Checksum as "
+                                        + receivedCRC + "; canceling 
transaction and rolling back session");
+                            }
+                        }
+
+                        
writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, "");
+                    } else {
+                        throw new ProtocolException("Expected to receive 
'Confirm Transaction' response from peer "
+                                + peer + " but received " + 
transactionConfirmationResponse);
+                    }
+
+                    state = TransactionState.TRANSACTION_CONFIRMED;
+                }
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to confirm transaction with " + 
peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+
+    @Override
+    public final TransactionCompletion complete() throws IOException {
+        try {
+            try {
+                if (state != TransactionState.TRANSACTION_CONFIRMED) {
+                    throw new IllegalStateException("Cannot complete 
transaction with " + peer + " because state is " + state
+                            + "; Transaction can only be completed when state 
is " + TransactionState.TRANSACTION_CONFIRMED);
+                }
+
+                boolean backoff = false;
+                if (direction == TransferDirection.RECEIVE) {
+                    if (transfers == 0) {
+                        state = TransactionState.TRANSACTION_COMPLETED;
+                        return new ClientTransactionCompletion(false, 0, 0L, 
System.nanoTime() - creationNanoTime);
+                    }
+
+                    // Confirm that we received the data and the peer can now 
discard it
+                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", 
this, peer);
+                    
writeTransactionResponse(ResponseCode.TRANSACTION_FINISHED);
+
+                    state = TransactionState.TRANSACTION_COMPLETED;
+                } else {
+                    final Response transactionResponse;
+                    try {
+                        transactionResponse = readTransactionResponse();
+                    } catch (final IOException e) {
+                        throw new IOException(this + " Failed to receive a 
response from " + peer + " when expecting a TransactionFinished Indicator. "
+                                + "It is unknown whether or not the peer 
successfully received/processed the data.", e);
+                    }
+
+                    logger.debug("{} Received {} from {}", this, 
transactionResponse, peer);
+                    if (transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
+                        peer.penalize(destinationId, penaltyMillis);
+                        backoff = true;
+                    } else if (transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED) {
+                        throw new ProtocolException("After sending data to " + 
peer + ", expected TRANSACTION_FINISHED response but got " + 
transactionResponse);
+                    }
+
+                    state = TransactionState.TRANSACTION_COMPLETED;
+                }
+
+                return new ClientTransactionCompletion(backoff, transfers, 
contentBytes, System.nanoTime() - creationNanoTime);
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to complete transaction with " + 
peer + " due to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public final void cancel(final String explanation) throws IOException {
+        if (state == TransactionState.TRANSACTION_CANCELED || state == 
TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) {
+            throw new IllegalStateException("Cannot cancel transaction because 
state is already " + state);
+        }
+
+        try {
+            writeTransactionResponse(ResponseCode.CANCEL_TRANSACTION, 
explanation == null ? "<No explanation given>" : explanation);
+            state = TransactionState.TRANSACTION_CANCELED;
+        } catch (final IOException ioe) {
+            error();
+            throw new IOException("Failed to send 'cancel transaction' message 
to " + peer + " due to " + ioe, ioe);
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public final String toString() {
+        return getClass().getSimpleName() + "[Url=" + peer.getUrl() + ", 
TransferDirection=" + direction + ", State=" + state + "]";
+    }
+
+    @Override
+    public final void send(final DataPacket dataPacket) throws IOException {
+        try {
+            try {
+                if (state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                    throw new IllegalStateException("Cannot send data to " + 
peer + " because Transaction State is " + state);
+                }
+
+                if (direction == TransferDirection.RECEIVE) {
+                    throw new IllegalStateException("Attempting to send data 
to " + peer + " but started a RECEIVE Transaction");
+                }
+
+                if (transfers > 0) {
+                    
writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION);
+                }
+
+                logger.debug("{} Sending data to {}", this, peer);
+
+                final OutputStream os = 
peer.getCommunicationsSession().getOutput().getOutputStream();
+                final OutputStream dataOut = compress ? new 
CompressionOutputStream(os) : os;
+                final OutputStream out = new CheckedOutputStream(dataOut, crc);
+
+                codec.encode(dataPacket, out);
+
+                // need to close the CompressionOutputStream in order to force 
it write out any remaining bytes.
+                // Otherwise, do NOT close it because we don't want to close 
the underlying stream
+                // (CompressionOutputStream will not close the underlying 
stream when it's closed)
+                if (compress) {
+                    out.close();
+                }
+
+                transfers++;
+                contentBytes += dataPacket.getSize();
+                this.state = TransactionState.DATA_EXCHANGED;
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to send data to " + peer + " due 
to " + ioe, ioe);
+            }
+        } catch (final Exception e) {
+            error();
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java
new file mode 100644
index 0000000..9778ab4
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/ClientTransactionCompletion.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.TransactionCompletion;
+
+public class ClientTransactionCompletion implements TransactionCompletion {
+
+    private final boolean backoff;
+    private final int dataPacketsTransferred;
+    private final long bytesTransferred;
+    private final long durationNanos;
+
+    public ClientTransactionCompletion(final boolean backoff, final int 
dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
+        this.backoff = backoff;
+        this.dataPacketsTransferred = dataPacketsTransferred;
+        this.bytesTransferred = bytesTransferred;
+        this.durationNanos = durationNanos;
+    }
+
+    @Override
+    public boolean isBackoff() {
+        return backoff;
+    }
+
+    @Override
+    public int getDataPacketsTransferred() {
+        return dataPacketsTransferred;
+    }
+
+    @Override
+    public long getBytesTransferred() {
+        return bytesTransferred;
+    }
+
+    @Override
+    public long getDuration(final TimeUnit timeUnit) {
+        return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
new file mode 100644
index 0000000..0dec3df
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractSiteToSiteClient implements SiteToSiteClient {
+
+    protected final SiteToSiteClientConfig config;
+    protected final SiteInfoProvider siteInfoProvider;
+    protected final URI clusterUrl;
+
+    public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) {
+        this.config = config;
+
+        try {
+            Objects.requireNonNull(config.getUrl(), "URL cannot be null");
+            clusterUrl = new URI(config.getUrl());
+        } catch (final URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid Cluster URL: " + 
config.getUrl());
+        }
+
+        final int commsTimeout = (int) 
config.getTimeout(TimeUnit.MILLISECONDS);
+        siteInfoProvider = new SiteInfoProvider();
+        siteInfoProvider.setClusterUrl(clusterUrl);
+        siteInfoProvider.setSslContext(config.getSslContext());
+        siteInfoProvider.setConnectTimeoutMillis(commsTimeout);
+        siteInfoProvider.setReadTimeoutMillis(commsTimeout);
+        siteInfoProvider.setProxy(config.getHttpProxy());
+
+    }
+
+    @Override
+    public SiteToSiteClientConfig getConfig() {
+        return config;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
new file mode 100644
index 0000000..b67e014
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
+
+public class PeerSelector {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PeerSelector.class);
+    private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+
+    private static final long PEER_REFRESH_PERIOD = 60000L;
+
+    private final ReentrantLock peerRefreshLock = new ReentrantLock();
+    private volatile List<PeerStatus> peerStatuses;
+    private volatile long peerRefreshTime = 0L;
+    private final AtomicLong peerIndex = new AtomicLong(0L);
+    private volatile PeerStatusCache peerStatusCache;
+    private final File persistenceFile;
+
+    private EventReporter eventReporter;
+
+    private final PeerStatusProvider peerStatusProvider;
+    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations 
= new ConcurrentHashMap<>();
+
+    public PeerSelector(final PeerStatusProvider peerStatusProvider, final 
File persistenceFile) {
+        this.peerStatusProvider = peerStatusProvider;
+        this.persistenceFile = persistenceFile;
+        Set<PeerStatus> recoveredStatuses;
+        if (persistenceFile != null && persistenceFile.exists()) {
+            try {
+                recoveredStatuses = 
recoverPersistedPeerStatuses(persistenceFile);
+                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, 
persistenceFile.lastModified());
+            } catch (final IOException ioe) {
+                logger.warn("Failed to recover peer statuses from {} due to 
{}; will continue without loading information from file", persistenceFile, ioe);
+            }
+        } else {
+            peerStatusCache = null;
+        }
+    }
+
+    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
+        if (persistenceFile == null) {
+            return;
+        }
+
+        try (final OutputStream fos = new FileOutputStream(persistenceFile);
+             final OutputStream out = new BufferedOutputStream(fos)) {
+
+            for (final PeerStatus status : statuses) {
+                final PeerDescription description = 
status.getPeerDescription();
+                final String line = description.getHostname() + ":" + 
description.getPort() + ":" + description.isSecure() + "\n";
+                out.write(line.getBytes(StandardCharsets.UTF_8));
+            }
+
+        } catch (final IOException e) {
+            error(logger, eventReporter, "Failed to persist list of Peers due 
to {}; if restarted and peer's NCM is down," +
+                    " may be unable to transfer data until communications with 
NCM are restored", e.toString());
+            logger.error("", e);
+        }
+    }
+
+    private static Set<PeerStatus> recoverPersistedPeerStatuses(final File 
file) throws IOException {
+        if (!file.exists()) {
+            return null;
+        }
+
+        final Set<PeerStatus> statuses = new HashSet<>();
+        try (final InputStream fis = new FileInputStream(file);
+             final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis))) {
+
+            String line;
+            while ((line = reader.readLine()) != null) {
+                final String[] splits = line.split(Pattern.quote(":"));
+                if (splits.length != 3) {
+                    continue;
+                }
+
+                final String hostname = splits[0];
+                final int port = Integer.parseInt(splits[1]);
+                final boolean secure = Boolean.parseBoolean(splits[2]);
+
+                statuses.add(new PeerStatus(new PeerDescription(hostname, 
port, secure), 1));
+            }
+        }
+
+        return statuses;
+    }
+
+    List<PeerStatus> formulateDestinationList(final Set<PeerStatus> statuses, 
final TransferDirection direction) {
+
+        final int numDestinations = Math.max(128, statuses.size());
+        final Map<PeerStatus, Integer> entryCountMap = new HashMap<>();
+
+        long totalFlowFileCount = 0L;
+        for (final PeerStatus nodeInfo : statuses) {
+            totalFlowFileCount += nodeInfo.getFlowFileCount();
+        }
+
+        int totalEntries = 0;
+        for (final PeerStatus nodeInfo : statuses) {
+            final int flowFileCount = nodeInfo.getFlowFileCount();
+            // don't allow any node to get more than 80% of the data
+            final double percentageOfFlowFiles = Math.min(0.8D, ((double) 
flowFileCount / (double) totalFlowFileCount));
+            final double relativeWeighting = (direction == 
TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
+            final int entries = Math.max(1, (int) (numDestinations * 
relativeWeighting));
+
+            entryCountMap.put(nodeInfo, Math.max(1, entries));
+            totalEntries += entries;
+        }
+
+        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
+        for (int i = 0; i < totalEntries; i++) {
+            destinations.add(null);
+        }
+        for (final Map.Entry<PeerStatus, Integer> entry : 
entryCountMap.entrySet()) {
+            final PeerStatus nodeInfo = entry.getKey();
+            final int numEntries = entry.getValue();
+
+            int skipIndex = numEntries;
+            for (int i = 0; i < numEntries; i++) {
+                int n = (skipIndex * i);
+                while (true) {
+                    final int index = n % destinations.size();
+                    PeerStatus status = destinations.get(index);
+                    if (status == null) {
+                        status = new PeerStatus(nodeInfo.getPeerDescription(), 
nodeInfo.getFlowFileCount());
+                        destinations.set(index, status);
+                        break;
+                    } else {
+                        n++;
+                    }
+                }
+            }
+        }
+
+        final StringBuilder distributionDescription = new StringBuilder();
+        distributionDescription.append("New Weighted Distribution of Nodes:");
+        for (final Map.Entry<PeerStatus, Integer> entry : 
entryCountMap.entrySet()) {
+            final double percentage = entry.getValue() * 100D / 
destinations.size();
+            
distributionDescription.append("\n").append(entry.getKey()).append(" will 
receive ").append(percentage).append("% of data");
+        }
+        logger.info(distributionDescription.toString());
+
+        // Jumble the list of destinations.
+        return destinations;
+    }
+
+    /**
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer
+     *
+     * @param peer the peer
+     * @param penalizationMillis period of time to penalize a given peer
+     */
+    public void penalize(final Peer peer, final long penalizationMillis) {
+        penalize(peer.getDescription(), penalizationMillis);
+    }
+
+    public void penalize(final PeerDescription peerDescription, final long 
penalizationMillis) {
+        Long expiration = peerTimeoutExpirations.get(peerDescription);
+        if (expiration == null) {
+            expiration = Long.valueOf(0L);
+        }
+
+        final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
+        peerTimeoutExpirations.put(peerDescription, 
Long.valueOf(newExpiration));
+    }
+
+    public boolean isPenalized(final PeerStatus peerStatus) {
+        final Long expirationEnd = 
peerTimeoutExpirations.get(peerStatus.getPeerDescription());
+        return (expirationEnd != null && expirationEnd > 
System.currentTimeMillis());
+    }
+
+    public void clear() {
+        peerTimeoutExpirations.clear();
+    }
+
+    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
+        return (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+    }
+
+    /**
+     * Return status of a peer that will be used for the next communication.
+     * The peer with less workload will be selected with higher probability.
+     * @param direction the amount of workload is calculated based on 
transaction direction,
+     *                  for SEND, a peer with less flow files is preferred,
+     *                  for RECEIVE, a peer with more flow files is preferred
+     * @return a selected peer, if there is no available peer or all peers are 
penalized, then return null
+     */
+    public PeerStatus getNextPeerStatus(final TransferDirection direction) {
+        List<PeerStatus> peerList = peerStatuses;
+        if (isPeerRefreshNeeded(peerList)) {
+            peerRefreshLock.lock();
+            try {
+                // now that we have the lock, check again that we need to 
refresh (because another thread
+                // could have been refreshing while we were waiting for the 
lock).
+                peerList = peerStatuses;
+                if (isPeerRefreshNeeded(peerList)) {
+                    try {
+                        peerList = createPeerStatusList(direction);
+                    } catch (final Exception e) {
+                        final String message = String.format("%s Failed to 
update list of peers due to %s", this, e.toString());
+                        warn(logger, eventReporter, message);
+                        if (logger.isDebugEnabled()) {
+                            logger.warn("", e);
+                        }
+                    }
+
+                    this.peerStatuses = peerList;
+                    peerRefreshTime = System.currentTimeMillis();
+                }
+            } finally {
+                peerRefreshLock.unlock();
+            }
+        }
+
+        if (peerList == null || peerList.isEmpty()) {
+            return null;
+        }
+
+        PeerStatus peerStatus;
+        for (int i = 0; i < peerList.size(); i++) {
+            final long idx = peerIndex.getAndIncrement();
+            final int listIndex = (int) (idx % peerList.size());
+            peerStatus = peerList.get(listIndex);
+
+            if (isPenalized(peerStatus)) {
+                logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
+            } else {
+                return peerStatus;
+            }
+        }
+
+        logger.debug("{} All peers appear to be penalized; returning null", 
this);
+        return null;
+    }
+
+    private List<PeerStatus> createPeerStatusList(final TransferDirection 
direction) throws IOException {
+        Set<PeerStatus> statuses = getPeerStatuses();
+        if (statuses == null) {
+            refreshPeers();
+            statuses = getPeerStatuses();
+            if (statuses == null) {
+                logger.debug("{} found no peers to connect to", this);
+                return Collections.emptyList();
+            }
+        }
+        return formulateDestinationList(statuses, direction);
+    }
+
+    private Set<PeerStatus> getPeerStatuses() {
+        final PeerStatusCache cache = this.peerStatusCache;
+        if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
+            return null;
+        }
+
+        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
System.currentTimeMillis()) {
+            final Set<PeerStatus> equalizedSet = new 
HashSet<>(cache.getStatuses().size());
+            for (final PeerStatus status : cache.getStatuses()) {
+                final PeerStatus equalizedStatus = new 
PeerStatus(status.getPeerDescription(), 1);
+                equalizedSet.add(equalizedStatus);
+            }
+
+            return equalizedSet;
+        }
+
+        return cache.getStatuses();
+    }
+
+    public void refreshPeers() {
+        final PeerStatusCache existingCache = peerStatusCache;
+        if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
+            return;
+        }
+
+        try {
+            final Set<PeerStatus> statuses = 
peerStatusProvider.fetchRemotePeerStatuses();
+            persistPeerStatuses(statuses);
+            peerStatusCache = new PeerStatusCache(statuses);
+            logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", this, statuses.size());
+        } catch (Exception e) {
+            warn(logger, eventReporter, "{} Unable to refresh Remote Group's 
peers due to {}", this, e.getMessage());
+            if (logger.isDebugEnabled()) {
+                logger.debug("", e);
+            }
+        }
+    }
+
+    public void setEventReporter(EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
new file mode 100644
index 0000000..68c30af
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerStatusProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.PeerStatus;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface PeerStatusProvider {
+
+    Set<PeerStatus> fetchRemotePeerStatuses() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
new file mode 100644
index 0000000..740ac3a
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SiteInfoProvider {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SiteInfoProvider.class);
+
+    private static final long REMOTE_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+    private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
+    private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
+    private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
+    private Integer siteToSitePort;
+    private Integer siteToSiteHttpPort;
+    private Boolean siteToSiteSecure;
+    private long remoteRefreshTime;
+    private HttpProxy proxy;
+
+    private final Map<String, String> inputPortMap = new HashMap<>(); // map 
input port name to identifier
+    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
+
+    private URI clusterUrl;
+    private SSLContext sslContext;
+    private int connectTimeoutMillis;
+    private int readTimeoutMillis;
+
+    private ControllerDTO refreshRemoteInfo() throws IOException {
+        final ControllerDTO controller;
+
+        try (final SiteToSiteRestApiClient apiClient = new 
SiteToSiteRestApiClient(sslContext, proxy)) {
+            apiClient.resolveBaseUrl(clusterUrl);
+            apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
+            apiClient.setReadTimeoutMillis(readTimeoutMillis);
+            controller = apiClient.getController();
+        }
+
+        remoteInfoWriteLock.lock();
+        try {
+            this.siteToSitePort = controller.getRemoteSiteListeningPort();
+            this.siteToSiteHttpPort = 
controller.getRemoteSiteHttpListeningPort();
+            this.siteToSiteSecure = controller.isSiteToSiteSecure();
+
+            inputPortMap.clear();
+            for (final PortDTO inputPort : controller.getInputPorts()) {
+                inputPortMap.put(inputPort.getName(), inputPort.getId());
+            }
+
+            outputPortMap.clear();
+            for (final PortDTO outputPort : controller.getOutputPorts()) {
+                outputPortMap.put(outputPort.getName(), outputPort.getId());
+            }
+
+            this.remoteRefreshTime = System.currentTimeMillis();
+        } finally {
+            remoteInfoWriteLock.unlock();
+        }
+
+        return controller;
+    }
+
+    public boolean isWebInterfaceSecure() {
+        return clusterUrl.toString().startsWith("https");
+    }
+
+    /**
+     * @return the port that the remote instance is listening on for
+     * RAW Socket site-to-site communication, or <code>null</code> if the 
remote instance
+     * is not configured to allow site-to-site communications.
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    public Integer getSiteToSitePort() throws IOException {
+        Integer listeningPort;
+        remoteInfoReadLock.lock();
+        try {
+            listeningPort = this.siteToSitePort;
+            if (listeningPort != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+                return listeningPort;
+            }
+        } finally {
+            remoteInfoReadLock.unlock();
+        }
+
+        final ControllerDTO controller = refreshRemoteInfo();
+        listeningPort = controller.getRemoteSiteListeningPort();
+
+        return listeningPort;
+    }
+
+    /**
+     * @return the port that the remote instance is listening on for
+     * HTTP(S) site-to-site communication, or <code>null</code> if the remote 
instance
+     * is not configured to allow site-to-site communications.
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    public Integer getSiteToSiteHttpPort() throws IOException {
+        Integer listeningHttpPort;
+        remoteInfoReadLock.lock();
+        try {
+            listeningHttpPort = this.siteToSiteHttpPort;
+            if (listeningHttpPort != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+                return listeningHttpPort;
+            }
+        } finally {
+            remoteInfoReadLock.unlock();
+        }
+
+        final ControllerDTO controller = refreshRemoteInfo();
+        listeningHttpPort = controller.getRemoteSiteHttpListeningPort();
+
+        return listeningHttpPort;
+    }
+
+    /**
+     * @return {@code true} if the remote instance is configured for secure
+     * site-to-site communications, {@code false} otherwise
+     * @throws IOException if unable to check if secure
+     */
+    public boolean isSecure() throws IOException {
+        remoteInfoReadLock.lock();
+        try {
+            final Boolean secure = this.siteToSiteSecure;
+            if (secure != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+                return secure;
+            }
+        } finally {
+            remoteInfoReadLock.unlock();
+        }
+
+        final ControllerDTO controller = refreshRemoteInfo();
+        final Boolean isSecure = controller.isSiteToSiteSecure();
+        if (isSecure == null) {
+            throw new IOException("Remote NiFi instance " + clusterUrl + " is 
not currently configured to accept site-to-site connections");
+        }
+
+        return isSecure;
+    }
+
+    public String getPortIdentifier(final String portName, final 
TransferDirection transferDirection) throws  IOException {
+        if (transferDirection == TransferDirection.RECEIVE) {
+            return getOutputPortIdentifier(portName);
+        } else {
+            return getInputPortIdentifier(portName);
+        }
+    }
+
+    public String getInputPortIdentifier(final String portName) throws 
IOException {
+        return getPortIdentifier(portName, inputPortMap);
+    }
+
+    public String getOutputPortIdentifier(final String portName) throws 
IOException {
+        return getPortIdentifier(portName, outputPortMap);
+    }
+
+    private String getPortIdentifier(final String portName, final Map<String, 
String> portMap) throws IOException {
+        String identifier;
+        remoteInfoReadLock.lock();
+        try {
+            identifier = portMap.get(portName);
+        } finally {
+            remoteInfoReadLock.unlock();
+        }
+
+        if (identifier != null) {
+            return identifier;
+        }
+
+        refreshRemoteInfo();
+
+        remoteInfoReadLock.lock();
+        try {
+            return portMap.get(portName);
+        } finally {
+            remoteInfoReadLock.unlock();
+        }
+    }
+
+    public void setClusterUrl(URI clusterUrl) {
+        this.clusterUrl = clusterUrl;
+    }
+
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    public void setReadTimeoutMillis(int readTimeoutMillis) {
+        this.readTimeoutMillis = readTimeoutMillis;
+    }
+
+    public void setProxy(HttpProxy proxy) {
+        this.proxy = proxy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index d982cc4..32d1141 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,29 +16,31 @@
  */
 package org.apache.nifi.remote.client;
 
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.security.KeyStore;
-import java.security.SecureRandom;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.http.HttpClient;
 import org.apache.nifi.remote.client.socket.SocketClient;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <p>
@@ -163,6 +165,8 @@ public interface SiteToSiteClient extends Closeable {
         private int batchCount;
         private long batchSize;
         private long batchNanos;
+        private SiteToSiteTransportProtocol transportProtocol = 
SiteToSiteTransportProtocol.RAW;
+        private HttpProxy httpProxy;
 
         /**
          * Populates the builder with values from the provided config
@@ -185,11 +189,13 @@ public interface SiteToSiteClient extends Closeable {
             this.eventReporter = config.getEventReporter();
             this.peerPersistenceFile = config.getPeerPersistenceFile();
             this.useCompression = config.isUseCompression();
+            this.transportProtocol = config.getTransportProtocol();
             this.portName = config.getPortName();
             this.portIdentifier = config.getPortIdentifier();
             this.batchCount = config.getPreferredBatchCount();
             this.batchSize = config.getPreferredBatchSize();
             this.batchNanos = 
config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+            this.httpProxy = config.getHttpProxy();
 
             return this;
         }
@@ -441,6 +447,16 @@ public interface SiteToSiteClient extends Closeable {
         }
 
         /**
+         * Specifies the protocol to use for site to site data transport.
+         * @param transportProtocol transport protocol
+         * @return the builder
+         */
+        public Builder transportProtocol(final SiteToSiteTransportProtocol 
transportProtocol) {
+            this.transportProtocol = transportProtocol;
+            return this;
+        }
+
+        /**
          * Specifies the name of the port to communicate with. Either the port
          * name or the port identifier must be specified.
          *
@@ -521,7 +537,8 @@ public interface SiteToSiteClient extends Closeable {
          *         data with remote instances of NiFi
          *
          * @throws IllegalStateException if either the url is not set or 
neither
-         *             the port name nor port identifier is set.
+         *             the port name nor port identifier is set,
+         *             or if the transport protocol is not supported.
          */
         public SiteToSiteClient build() {
             if (url == null) {
@@ -532,7 +549,14 @@ public interface SiteToSiteClient extends Closeable {
                 throw new IllegalStateException("Must specify either Port Name 
or Port Identifier to build Site-to-Site client");
             }
 
-            return new SocketClient(buildConfig());
+            switch (transportProtocol){
+                case RAW:
+                    return new SocketClient(buildConfig());
+                case HTTP:
+                    return new HttpClient(buildConfig());
+                default:
+                    throw new IllegalStateException("Transport protocol '" + 
transportProtocol + "' is not supported.");
+            }
         }
 
         /**
@@ -600,6 +624,13 @@ public interface SiteToSiteClient extends Closeable {
         }
 
         /**
+         * @return the transport protocol to use, defaults to RAW
+         */
+        public SiteToSiteTransportProtocol getTransportProtocol(){
+            return transportProtocol;
+        }
+
+        /**
          * @return the name of the port that the client is to communicate with
          */
         public String getPortName() {
@@ -613,6 +644,22 @@ public interface SiteToSiteClient extends Closeable {
         public String getPortIdentifier() {
             return portIdentifier;
         }
+
+
+        /**
+         * Specify a HTTP proxy information to use with HTTP protocol of 
Site-to-Site communication.
+         * @param httpProxy HTTP proxy information
+         * @return the builder
+         */
+        public Builder httpProxy(final HttpProxy httpProxy) {
+            this.httpProxy = httpProxy;
+            return this;
+        }
+
+        public HttpProxy getHttpProxy() {
+            return httpProxy;
+        }
+
     }
 
 
@@ -634,11 +681,13 @@ public interface SiteToSiteClient extends Closeable {
         private final EventReporter eventReporter;
         private final File peerPersistenceFile;
         private final boolean useCompression;
+        private final SiteToSiteTransportProtocol transportProtocol;
         private final String portName;
         private final String portIdentifier;
         private final int batchCount;
         private final long batchSize;
         private final long batchNanos;
+        private final HttpProxy httpProxy;
 
         // some serialization frameworks require a default constructor
         private StandardSiteToSiteClientConfig() {
@@ -661,6 +710,8 @@ public interface SiteToSiteClient extends Closeable {
             this.batchCount = 0;
             this.batchSize = 0;
             this.batchNanos = 0;
+            this.transportProtocol = null;
+            this.httpProxy = null;
         }
 
         private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder 
builder) {
@@ -683,6 +734,8 @@ public interface SiteToSiteClient extends Closeable {
             this.batchCount = builder.batchCount;
             this.batchSize = builder.batchSize;
             this.batchNanos = builder.batchNanos;
+            this.transportProtocol = builder.getTransportProtocol();
+            this.httpProxy = builder.getHttpProxy();
         }
 
         @Override
@@ -830,5 +883,15 @@ public interface SiteToSiteClient extends Closeable {
         public KeystoreType getTruststoreType() {
             return truststoreType;
         }
+
+        @Override
+        public SiteToSiteTransportProtocol getTransportProtocol() {
+            return transportProtocol;
+        }
+
+        @Override
+        public HttpProxy getHttpProxy() {
+            return httpProxy;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 59891f0..65a7cfc 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -24,6 +24,8 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
 
 public interface SiteToSiteClientConfig extends Serializable {
 
@@ -101,6 +103,11 @@ public interface SiteToSiteClientConfig extends 
Serializable {
     boolean isUseCompression();
 
     /**
+     * @return a transport protocol to use
+     */
+    SiteToSiteTransportProtocol getTransportProtocol();
+
+    /**
      * @return the name of the port that the client is to communicate with
      */
     String getPortName();
@@ -146,4 +153,10 @@ public interface SiteToSiteClientConfig extends 
Serializable {
      */
     EventReporter getEventReporter();
 
+    /**
+     * Return Proxy for HTTP Transport Protocol.
+     * @return proxy or null if not specified
+     */
+    HttpProxy getHttpProxy();
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
new file mode 100644
index 0000000..3312e88
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.http;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
+import org.apache.nifi.remote.client.PeerSelector;
+import org.apache.nifi.remote.client.PeerStatusProvider;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.http.HttpClientTransaction;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.web.api.dto.remote.PeerDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class HttpClient extends AbstractSiteToSiteClient implements 
PeerStatusProvider {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpClient.class);
+
+    private final ScheduledExecutorService taskExecutor;
+    private final PeerSelector peerSelector;
+
+    public HttpClient(final SiteToSiteClientConfig config) {
+        super(config);
+
+        peerSelector = new PeerSelector(this, config.getPeerPersistenceFile());
+        peerSelector.setEventReporter(config.getEventReporter());
+
+        taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() 
{
+            private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = defaultFactory.newThread(r);
+                thread.setName("Http Site-to-Site PeerSelector");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+
+        taskExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                peerSelector.refreshPeers();
+            }
+        }, 0, 5, TimeUnit.SECONDS);
+
+    }
+
+    @Override
+    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
+        if (siteInfoProvider.getSiteToSiteHttpPort() == null) {
+            throw new IOException("Remote instance of NiFi is not configured 
to allow HTTP site-to-site communications");
+        }
+
+        final String scheme = siteInfoProvider.isSecure() ? "https" : "http";
+        final URI clusterUrl;
+        try {
+            clusterUrl = new URI(config.getUrl());
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Specified clusterUrl was: " + 
config.getUrl(), e);
+        }
+
+        try (
+            SiteToSiteRestApiClient apiClient = new 
SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy())
+        ) {
+            String clusterApiUrl = apiClient.resolveBaseUrl(scheme, 
clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort());
+
+            int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
+            apiClient.setConnectTimeoutMillis(timeoutMillis);
+            apiClient.setReadTimeoutMillis(timeoutMillis);
+            Collection<PeerDTO> peers = apiClient.getPeers();
+            if(peers == null || peers.size() == 0){
+                throw new IOException("Couldn't get any peer to communicate 
with. " + clusterApiUrl + " returned zero peers.");
+            }
+
+            return peers.stream()
+                    .map(p -> new PeerStatus(new 
PeerDescription(p.getHostname(), p.getPort(), p.isSecure()), 
p.getFlowFileCount()))
+                    .collect(Collectors.toSet());
+        }
+    }
+
+    @Override
+    public Transaction createTransaction(TransferDirection direction) throws 
HandshakeException, PortNotRunningException, ProtocolException, 
UnknownPortException, IOException {
+
+        int timeoutMillis = (int) config.getTimeout(TimeUnit.MILLISECONDS);
+
+        PeerStatus peerStatus;
+        while ((peerStatus = peerSelector.getNextPeerStatus(direction)) != 
null) {
+            logger.debug("peerStatus={}", peerStatus);
+
+            CommunicationsSession commSession = new 
HttpCommunicationsSession();
+            String nodeApiUrl = 
resolveNodeApiUrl(peerStatus.getPeerDescription());
+            commSession.setUri(nodeApiUrl);
+            String clusterUrl = config.getUrl();
+            Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, 
nodeApiUrl, clusterUrl);
+
+            int penaltyMillis = (int) 
config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
+            String portId = config.getPortIdentifier();
+            if (StringUtils.isEmpty(portId)) {
+                portId = 
siteInfoProvider.getPortIdentifier(config.getPortName(), direction);
+                if (StringUtils.isEmpty(portId)) {
+                    peer.close();
+                    throw new IOException("Failed to determine the identifier 
of port " + config.getPortName());
+                }
+            }
+
+            SiteToSiteRestApiClient apiClient = new 
SiteToSiteRestApiClient(config.getSslContext(), config.getHttpProxy());
+
+            apiClient.setBaseUrl(peer.getUrl());
+            apiClient.setConnectTimeoutMillis(timeoutMillis);
+            apiClient.setReadTimeoutMillis(timeoutMillis);
+
+            apiClient.setCompress(config.isUseCompression());
+            
apiClient.setRequestExpirationMillis(config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS));
+            apiClient.setBatchCount(config.getPreferredBatchCount());
+            apiClient.setBatchSize(config.getPreferredBatchSize());
+            
apiClient.setBatchDurationMillis(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
+
+            final String transactionUrl;
+            try {
+                transactionUrl = apiClient.initiateTransaction(direction, 
portId);
+                commSession.setUserDn(apiClient.getTrustedPeerDn());
+            } catch (Exception e) {
+                logger.debug("Penalizing a peer due to {}", e.getMessage());
+                peerSelector.penalize(peer, penaltyMillis);
+
+                if (e instanceof UnknownPortException || e instanceof 
PortNotRunningException) {
+                    throw e;
+                }
+
+                logger.debug("Continue trying other peers...");
+                continue;
+            }
+
+            // We found a valid peer to communicate with.
+            Integer transactionProtocolVersion = 
apiClient.getTransactionProtocolVersion();
+            HttpClientTransaction transaction = new 
HttpClientTransaction(transactionProtocolVersion, peer, direction,
+                    config.isUseCompression(), portId, penaltyMillis, 
config.getEventReporter());
+            transaction.initialize(apiClient, transactionUrl);
+
+            return transaction;
+        }
+
+        logger.info("Couldn't find a valid peer to communicate with.");
+        return null;
+
+    }
+
+    private String resolveNodeApiUrl(PeerDescription description) {
+        return (description.isSecure() ? "https" : "http") + "://" + 
description.getHostname() + ":" + description.getPort() + "/nifi-api";
+    }
+
+    @Override
+    public boolean isSecure() throws IOException {
+        return siteInfoProvider.isWebInterfaceSecure();
+    }
+
+    @Override
+    public void close() throws IOException {
+        taskExecutor.shutdown();
+        peerSelector.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
new file mode 100644
index 0000000..d0a6368
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/TransportProtocolVersionNegotiator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.http;
+
+import org.apache.nifi.remote.StandardVersionNegotiator;
+
+public class TransportProtocolVersionNegotiator extends 
StandardVersionNegotiator {
+
+    public TransportProtocolVersionNegotiator(final int... supportedVersions) {
+        super(supportedVersions);
+    }
+
+    /**
+     * Returns a transaction protocol version for this transport protocol 
version.
+     * This method lets transport protocol to move forward independently from 
transaction protocol.
+     * @return a transaction protocol version
+     */
+    public int getTransactionProtocolVersion() {
+        switch (getVersion()) {
+            case 1:
+                return 5;
+            default:
+                throw new RuntimeException("Transport protocol version " + 
getVersion()
+                        + " was not configured with any transaction protocol 
version.");
+        }
+    }
+
+}

Reply via email to