http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index fa35f28..8a6a91f 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -16,29 +16,43 @@
  */
 package org.apache.nifi.remote.client.socket;
 
-import java.io.BufferedReader;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.PeerSelector;
+import org.apache.nifi.remote.client.PeerStatusProvider;
+import org.apache.nifi.remote.client.SiteInfoProvider;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -49,125 +63,47 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-import javax.net.ssl.SSLContext;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerDescription;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteDestination;
-import org.apache.nifi.remote.RemoteResourceInitiator;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-import org.apache.nifi.remote.util.NiFiRestApiUtil;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.helpers.MessageFormatter;
 
-public class EndpointConnectionPool {
+import static org.apache.nifi.remote.util.EventReportUtil.error;
+import static org.apache.nifi.remote.util.EventReportUtil.warn;
 
-    public static final long PEER_REFRESH_PERIOD = 60000L;
-    public static final String CATEGORY = "Site-to-Site";
-    public static final long REMOTE_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
-
-    private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
+public class EndpointConnectionPool implements PeerStatusProvider {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EndpointConnectionPool.class);
 
     private final ConcurrentMap<PeerDescription, 
BlockingQueue<EndpointConnection>> connectionQueueMap = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations 
= new ConcurrentHashMap<>();
     private final URI clusterUrl;
-    private final String apiUri;
-
-    private final AtomicLong peerIndex = new AtomicLong(0L);
 
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile long peerRefreshTime = 0L;
-    private volatile PeerStatusCache peerStatusCache;
-    private final Set<EndpointConnection> activeConnections = 
Collections.synchronizedSet(new HashSet<EndpointConnection>());
+    private final Set<EndpointConnection> activeConnections = 
Collections.synchronizedSet(new HashSet<>());
 
-    private final File peersFile;
     private final EventReporter eventReporter;
     private final SSLContext sslContext;
     private final ScheduledExecutorService taskExecutor;
     private final int idleExpirationMillis;
     private final RemoteDestination remoteDestination;
 
-    private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
-    private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
-    private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
-    private Integer siteToSitePort;
-    private Boolean siteToSiteSecure;
-    private long remoteRefreshTime;
-    private final Map<String, String> inputPortMap = new HashMap<>(); // map 
input port name to identifier
-    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
-
     private volatile int commsTimeout;
     private volatile boolean shutdown = false;
 
-    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis,
-            final int idleExpirationMillis, final EventReporter eventReporter, 
final File persistenceFile) {
-        this(clusterUrl, remoteDestination, commsTimeoutMillis, 
idleExpirationMillis, null, eventReporter, persistenceFile);
-    }
+    private final SiteInfoProvider siteInfoProvider;
+    private final PeerSelector peerSelector;
 
-    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, final int 
idleExpirationMillis,
-            final SSLContext sslContext, final EventReporter eventReporter, 
final File persistenceFile) {
+    public EndpointConnectionPool(final URI clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, final int 
idleExpirationMillis,
+            final SSLContext sslContext, final EventReporter eventReporter, 
final File persistenceFile, final SiteInfoProvider siteInfoProvider) {
         Objects.requireNonNull(clusterUrl, "URL cannot be null");
         Objects.requireNonNull(remoteDestination, "Remote Destination/Port 
Identifier cannot be null");
-        try {
-            this.clusterUrl = new URI(clusterUrl);
-        } catch (final URISyntaxException e) {
-            throw new IllegalArgumentException("Invalid Cluster URL: " + 
clusterUrl);
-        }
-
-        // Trim the trailing /
-        String uriPath = this.clusterUrl.getPath();
-        if (uriPath.endsWith("/")) {
-            uriPath = uriPath.substring(0, uriPath.length() - 1);
-        }
-        apiUri = this.clusterUrl.getScheme() + "://" + 
this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
 
+        this.clusterUrl = clusterUrl;
         this.remoteDestination = remoteDestination;
         this.sslContext = sslContext;
-        this.peersFile = persistenceFile;
         this.eventReporter = eventReporter;
         this.commsTimeout = commsTimeoutMillis;
         this.idleExpirationMillis = idleExpirationMillis;
 
-        Set<PeerStatus> recoveredStatuses;
-        if (persistenceFile != null && persistenceFile.exists()) {
-            try {
-                recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
-                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, 
peersFile.lastModified());
-            } catch (final IOException ioe) {
-                logger.warn("Failed to recover peer statuses from {} due to 
{}; will continue without loading information from file", persistenceFile, ioe);
-            }
-        } else {
-            peerStatusCache = null;
-        }
+        this.siteInfoProvider = siteInfoProvider;
+
+        peerSelector = new PeerSelector(this, persistenceFile);
+        peerSelector.setEventReporter(eventReporter);
 
         // Initialize a scheduled executor and run some maintenance tasks in 
the background to kill off old, unused
         // connections and keep our list of peers up-to-date.
@@ -186,7 +122,7 @@ public class EndpointConnectionPool {
         taskExecutor.scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                refreshPeers();
+                peerSelector.refreshPeers();
             }
         }, 0, 5, TimeUnit.SECONDS);
 
@@ -198,38 +134,11 @@ public class EndpointConnectionPool {
         }, 5, 5, TimeUnit.SECONDS);
     }
 
-    void warn(final String msg, final Object... args) {
-        logger.warn(msg, args);
-        if (eventReporter != null) {
-            eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
-        }
-    }
-
-    void warn(final String msg, final Throwable t) {
-        logger.warn(msg, t);
-
-        if (eventReporter != null) {
-            eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + 
": " + t.toString());
-        }
-    }
-
-    void error(final String msg, final Object... args) {
-        logger.error(msg, args);
-        if (eventReporter != null) {
-            eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
-        }
-    }
-
     private String getPortIdentifier(final TransferDirection 
transferDirection) throws IOException {
         if (remoteDestination.getIdentifier() != null) {
             return remoteDestination.getIdentifier();
         }
-
-        if (transferDirection == TransferDirection.RECEIVE) {
-            return getOutputPortIdentifier(remoteDestination.getName());
-        } else {
-            return getInputPortIdentifier(remoteDestination.getName());
-        }
+        return siteInfoProvider.getPortIdentifier(remoteDestination.getName(), 
transferDirection);
     }
 
     public EndpointConnection getEndpointConnection(final TransferDirection 
direction) throws IOException {
@@ -250,7 +159,7 @@ public class EndpointConnectionPool {
         do {
             final List<EndpointConnection> addBack = new ArrayList<>();
             logger.debug("{} getting next peer status", this);
-            final PeerStatus peerStatus = getNextPeerStatus(direction);
+            final PeerStatus peerStatus = 
peerSelector.getNextPeerStatus(direction);
             logger.debug("{} next peer status = {}", this, peerStatus);
             if (peerStatus == null) {
                 return null;
@@ -296,7 +205,7 @@ public class EndpointConnectionPool {
                         logger.debug("{} Establishing site-to-site connection 
with {}", this, peerStatus);
                         commsSession = 
establishSiteToSiteConnection(peerStatus);
                     } catch (final IOException ioe) {
-                        penalize(peerStatus.getPeerDescription(), 
penalizationMillis);
+                        peerSelector.penalize(peerStatus.getPeerDescription(), 
penalizationMillis);
                         throw ioe;
                     }
 
@@ -334,7 +243,7 @@ public class EndpointConnectionPool {
                             logger.warn("{} {} indicates that port {}'s 
destination is full; penalizing peer",
                                     this, peer, config.getPortName() == null ? 
config.getPortIdentifier() : config.getPortName());
 
-                            penalize(peer, penalizationMillis);
+                            peerSelector.penalize(peer, penalizationMillis);
                             try {
                                 peer.close();
                             } catch (final IOException ioe) {
@@ -342,11 +251,11 @@ public class EndpointConnectionPool {
 
                             continue;
                         } else if (protocol.isPortInvalid()) {
-                            penalize(peer, penalizationMillis);
+                            peerSelector.penalize(peer, penalizationMillis);
                             cleanup(protocol, peer);
                             throw new PortNotRunningException(peer.toString() 
+ " indicates that port " + portId + " is not running");
                         } else if (protocol.isPortUnknown()) {
-                            penalize(peer, penalizationMillis);
+                            peerSelector.penalize(peer, penalizationMillis);
                             cleanup(protocol, peer);
                             throw new UnknownPortException(peer.toString() + " 
indicates that port " + portId + " is not known");
                         }
@@ -358,11 +267,11 @@ public class EndpointConnectionPool {
                     } catch (final PortNotRunningException | 
UnknownPortException e) {
                         throw e;
                     } catch (final Exception e) {
-                        penalize(peer, penalizationMillis);
+                        peerSelector.penalize(peer, penalizationMillis);
                         cleanup(protocol, peer);
 
                         final String message = String.format("%s failed to 
communicate with %s due to %s", this, peer == null ? clusterUrl : peer, 
e.toString());
-                        error(message);
+                        error(logger, eventReporter, message);
                         if (logger.isDebugEnabled()) {
                             logger.error("", e);
                         }
@@ -427,27 +336,6 @@ public class EndpointConnectionPool {
         }
     }
 
-    private void penalize(final PeerDescription peerDescription, final long 
penalizationMillis) {
-        Long expiration = peerTimeoutExpirations.get(peerDescription);
-        if (expiration == null) {
-            expiration = Long.valueOf(0L);
-        }
-
-        final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(peerDescription, 
Long.valueOf(newExpiration));
-    }
-
-    /**
-     * Updates internal state map to penalize a PeerStatus that points to the
-     * specified peer
-     *
-     * @param peer the peer
-     * @param penalizationMillis period of time to penalize a given peer
-     */
-    public void penalize(final Peer peer, final long penalizationMillis) {
-        penalize(peer.getDescription(), penalizationMillis);
-    }
-
     private void cleanup(final SocketClientProtocol protocol, final Peer peer) 
{
         if (protocol != null && peer != null) {
             try {
@@ -470,113 +358,11 @@ public class EndpointConnectionPool {
         }
     }
 
-    private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
-        return (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
-    }
-
-    private PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if (isPeerRefreshNeeded(peerList)) {
-            peerRefreshLock.lock();
-            try {
-                // now that we have the lock, check again that we need to 
refresh (because another thread
-                // could have been refreshing while we were waiting for the 
lock).
-                peerList = peerStatuses;
-                if (isPeerRefreshNeeded(peerList)) {
-                    try {
-                        peerList = createPeerStatusList(direction);
-                    } catch (final Exception e) {
-                        final String message = String.format("%s Failed to 
update list of peers due to %s", this, e.toString());
-                        warn(message);
-                        if (logger.isDebugEnabled()) {
-                            logger.warn("", e);
-                        }
-
-                        if (eventReporter != null) {
-                            eventReporter.reportEvent(Severity.WARNING, 
CATEGORY, message);
-                        }
-                    }
-
-                    this.peerStatuses = peerList;
-                    peerRefreshTime = System.currentTimeMillis();
-                }
-            } finally {
-                peerRefreshLock.unlock();
-            }
-        }
-
-        if (peerList == null || peerList.isEmpty()) {
-            return null;
-        }
-
-        PeerStatus peerStatus;
-        for (int i = 0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
-
-            if (isPenalized(peerStatus)) {
-                logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
-            } else {
-                return peerStatus;
-            }
-        }
-
-        logger.debug("{} All peers appear to be penalized; returning null", 
this);
-        return null;
-    }
-
-    private boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = 
peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd != null && expirationEnd > 
System.currentTimeMillis());
-    }
-
-    private List<PeerStatus> createPeerStatusList(final TransferDirection 
direction) throws IOException {
-        Set<PeerStatus> statuses = getPeerStatuses();
-        if (statuses == null) {
-            refreshPeers();
-            statuses = getPeerStatuses();
-            if (statuses == null) {
-                logger.debug("{} found no peers to connect to", this);
-                return Collections.emptyList();
-            }
-        }
-
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> nodeInfos = new ArrayList<>();
-        for (final PeerStatus peerStatus : statuses) {
-            final PeerDescription description = 
peerStatus.getPeerDescription();
-            final NodeInformation nodeInfo = new 
NodeInformation(description.getHostname(), description.getPort(), 0, 
description.isSecure(), peerStatus.getFlowFileCount());
-            nodeInfos.add(nodeInfo);
-        }
-        clusterNodeInfo.setNodeInformation(nodeInfos);
-        return formulateDestinationList(clusterNodeInfo, direction);
-    }
-
-    private Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
System.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new 
HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new 
PeerStatus(status.getPeerDescription(), 1);
-                equalizedSet.add(equalizedStatus);
-            }
-
-            return equalizedSet;
-        }
-
-        return cache.getStatuses();
-    }
-
-    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
+    public Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
         final String hostname = clusterUrl.getHost();
-        final Integer port = getSiteToSitePort();
+        final Integer port = siteInfoProvider.getSiteToSitePort();
         if (port == null) {
-            throw new IOException("Remote instance of NiFi is not configured 
to allow site-to-site communications");
+            throw new IOException("Remote instance of NiFi is not configured 
to allow RAW Socket site-to-site communications");
         }
 
         final PeerDescription clusterPeerDescription = new 
PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://";));
@@ -604,13 +390,12 @@ public class EndpointConnectionPool {
         }
 
         final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
-        persistPeerStatuses(peerStatuses);
 
         try {
             clientProtocol.shutdown(peer);
         } catch (final IOException e) {
             final String message = String.format("%s Failed to shutdown 
protocol when updating list of peers due to %s", this, e.toString());
-            warn(message);
+            warn(logger, eventReporter, message);
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
@@ -620,7 +405,7 @@ public class EndpointConnectionPool {
             peer.close();
         } catch (final IOException e) {
             final String message = String.format("%s Failed to close resources 
when updating list of peers due to %s", this, e.toString());
-            warn(message);
+            warn(logger, eventReporter, message);
             if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
@@ -629,60 +414,13 @@ public class EndpointConnectionPool {
         return peerStatuses;
     }
 
-    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-        if (peersFile == null) {
-            return;
-        }
-
-        try (final OutputStream fos = new FileOutputStream(peersFile);
-                final OutputStream out = new BufferedOutputStream(fos)) {
-
-            for (final PeerStatus status : statuses) {
-                final PeerDescription description = 
status.getPeerDescription();
-                final String line = description.getHostname() + ":" + 
description.getPort() + ":" + description.isSecure() + "\n";
-                out.write(line.getBytes(StandardCharsets.UTF_8));
-            }
-
-        } catch (final IOException e) {
-            error("Failed to persist list of Peers due to {}; if restarted and 
peer's NCM is down, may be unable to transfer data until communications with 
NCM are restored", e.toString());
-            logger.error("", e);
-        }
-    }
-
-    private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) 
throws IOException {
-        if (!file.exists()) {
-            return null;
-        }
-
-        final Set<PeerStatus> statuses = new HashSet<>();
-        try (final InputStream fis = new FileInputStream(file);
-                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis))) {
-
-            String line;
-            while ((line = reader.readLine()) != null) {
-                final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3) {
-                    continue;
-                }
-
-                final String hostname = splits[0];
-                final int port = Integer.parseInt(splits[1]);
-                final boolean secure = Boolean.parseBoolean(splits[2]);
-
-                statuses.add(new PeerStatus(new PeerDescription(hostname, 
port, secure), 1));
-            }
-        }
-
-        return statuses;
-    }
-
     private CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
         final PeerDescription description = peerStatus.getPeerDescription();
         return establishSiteToSiteConnection(description.getHostname(), 
description.getPort());
     }
 
     private CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port) throws IOException {
-        final boolean siteToSiteSecure = isSecure();
+        final boolean siteToSiteSecure = siteInfoProvider.isSecure();
         final String destinationUri = "nifi://" + hostname + ":" + port;
 
         CommunicationsSession commsSession = null;
@@ -724,66 +462,6 @@ public class EndpointConnectionPool {
         return commsSession;
     }
 
-    static List<PeerStatus> formulateDestinationList(final 
ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
-        final Collection<NodeInformation> nodeInfoSet = 
clusterNodeInfo.getNodeInformation();
-        final int numDestinations = Math.max(128, nodeInfoSet.size());
-        final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
-
-        long totalFlowFileCount = 0L;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            totalFlowFileCount += nodeInfo.getTotalFlowFiles();
-        }
-
-        int totalEntries = 0;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            final int flowFileCount = nodeInfo.getTotalFlowFiles();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) 
flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == 
TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * 
relativeWeighting));
-
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
-        }
-
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i = 0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for (final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet()) {
-            final NodeInformation nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-
-            int skipIndex = numEntries;
-            for (int i = 0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if (status == null) {
-                        final PeerDescription description = new 
PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), 
nodeInfo.isSiteToSiteSecure());
-                        status = new PeerStatus(description, 
nodeInfo.getTotalFlowFiles());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
-            }
-        }
-
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for (final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet()) {
-            final double percentage = entry.getValue() * 100D / 
destinations.size();
-            
distributionDescription.append("\n").append(entry.getKey()).append(" will 
receive ").append(percentage).append("% of data");
-        }
-        logger.info(distributionDescription.toString());
-
-        // Jumble the list of destinations.
-        return destinations;
-    }
-
     private void cleanupExpiredSockets() {
         for (final BlockingQueue<EndpointConnection> connectionQueue : 
connectionQueueMap.values()) {
             final List<EndpointConnection> connections = new ArrayList<>();
@@ -813,7 +491,7 @@ public class EndpointConnectionPool {
     public void shutdown() {
         shutdown = true;
         taskExecutor.shutdown();
-        peerTimeoutExpirations.clear();
+        peerSelector.clear();
 
         for (final EndpointConnection conn : activeConnections) {
             conn.getPeer().getCommunicationsSession().interrupt();
@@ -832,138 +510,11 @@ public class EndpointConnectionPool {
         cleanup(connection.getSocketClientProtocol(), connection.getPeer());
     }
 
-    private void refreshPeers() {
-        final PeerStatusCache existingCache = peerStatusCache;
-        if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
-            return;
-        }
-
-        try {
-            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
-            peerStatusCache = new PeerStatusCache(statuses);
-            logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", this, statuses.size());
-        } catch (Exception e) {
-            warn("{} Unable to refresh Remote Group's peers due to {}", this, 
e);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-        }
-    }
-
-    public String getInputPortIdentifier(final String portName) throws 
IOException {
-        return getPortIdentifier(portName, inputPortMap);
-    }
-
-    public String getOutputPortIdentifier(final String portName) throws 
IOException {
-        return getPortIdentifier(portName, outputPortMap);
-    }
-
-    private String getPortIdentifier(final String portName, final Map<String, 
String> portMap) throws IOException {
-        String identifier;
-        remoteInfoReadLock.lock();
-        try {
-            identifier = portMap.get(portName);
-        } finally {
-            remoteInfoReadLock.unlock();
-        }
-
-        if (identifier != null) {
-            return identifier;
-        }
-
-        refreshRemoteInfo();
-
-        remoteInfoReadLock.lock();
-        try {
-            return portMap.get(portName);
-        } finally {
-            remoteInfoReadLock.unlock();
-        }
-    }
-
-    private ControllerDTO refreshRemoteInfo() throws IOException {
-        final boolean webInterfaceSecure = 
clusterUrl.toString().startsWith("https");
-        final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? 
sslContext : null);
-        final ControllerDTO controller = utils.getController(apiUri + 
"/controller", commsTimeout);
-
-        remoteInfoWriteLock.lock();
-        try {
-            this.siteToSitePort = controller.getRemoteSiteListeningPort();
-            this.siteToSiteSecure = controller.isSiteToSiteSecure();
-
-            inputPortMap.clear();
-            for (final PortDTO inputPort : controller.getInputPorts()) {
-                inputPortMap.put(inputPort.getName(), inputPort.getId());
-            }
-
-            outputPortMap.clear();
-            for (final PortDTO outputPort : controller.getOutputPorts()) {
-                outputPortMap.put(outputPort.getName(), outputPort.getId());
-            }
-
-            this.remoteRefreshTime = System.currentTimeMillis();
-        } finally {
-            remoteInfoWriteLock.unlock();
-        }
-
-        return controller;
-    }
-
-    /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    private Integer getSiteToSitePort() throws IOException {
-        Integer listeningPort;
-        remoteInfoReadLock.lock();
-        try {
-            listeningPort = this.siteToSitePort;
-            if (listeningPort != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
-                return listeningPort;
-            }
-        } finally {
-            remoteInfoReadLock.unlock();
-        }
-
-        final ControllerDTO controller = refreshRemoteInfo();
-        listeningPort = controller.getRemoteSiteListeningPort();
-
-        return listeningPort;
-    }
-
     @Override
     public String toString() {
         return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
     }
 
-    /**
-     * @return {@code true} if the remote instance is configured for secure
-     * site-to-site communications, {@code false} otherwise
-     * @throws IOException if unable to check if secure
-     */
-    public boolean isSecure() throws IOException {
-        remoteInfoReadLock.lock();
-        try {
-            final Boolean secure = this.siteToSiteSecure;
-            if (secure != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
-                return secure;
-            }
-        } finally {
-            remoteInfoReadLock.unlock();
-        }
-
-        final ControllerDTO controller = refreshRemoteInfo();
-        final Boolean isSecure = controller.isSiteToSiteSecure();
-        if (isSecure == null) {
-            throw new IOException("Remote NiFi instance " + clusterUrl + " is 
not currently configured to accept site-to-site connections");
-        }
-
-        return isSecure;
-    }
-
     private class IdEnrichedRemoteDestination implements RemoteDestination {
 
         private final RemoteDestination original;
@@ -994,4 +545,6 @@ public class EndpointConnectionPool {
             return original.isUseCompression();
         }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 33e4a66..d04234f 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -16,27 +16,26 @@
  */
 package org.apache.nifi.remote.client.socket;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.remote.Communicant;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.AbstractSiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.util.ObjectHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SocketClient implements SiteToSiteClient {
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SocketClient extends AbstractSiteToSiteClient {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SocketClient.class);
 
-    private final SiteToSiteClientConfig config;
     private final EndpointConnectionPool pool;
     private final boolean compress;
     private final String portName;
@@ -45,13 +44,17 @@ public class SocketClient implements SiteToSiteClient {
     private volatile boolean closed = false;
 
     public SocketClient(final SiteToSiteClientConfig config) {
-        pool = new EndpointConnectionPool(config.getUrl(),
+        super(config);
+
+        final int commsTimeout = (int) 
config.getTimeout(TimeUnit.MILLISECONDS);
+        pool = new EndpointConnectionPool(clusterUrl,
                 createRemoteDestination(config.getPortIdentifier(), 
config.getPortName()),
-                (int) config.getTimeout(TimeUnit.MILLISECONDS),
+                commsTimeout,
                 (int) 
config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
-                config.getSslContext(), config.getEventReporter(), 
config.getPeerPersistenceFile());
+                config.getSslContext(), config.getEventReporter(), 
config.getPeerPersistenceFile(),
+                siteInfoProvider
+        );
 
-        this.config = config;
         this.compress = config.isUseCompression();
         this.portIdentifier = config.getPortIdentifier();
         this.portName = config.getPortName();
@@ -59,13 +62,8 @@ public class SocketClient implements SiteToSiteClient {
     }
 
     @Override
-    public SiteToSiteClientConfig getConfig() {
-        return config;
-    }
-
-    @Override
     public boolean isSecure() throws IOException {
-        return pool.isSecure();
+        return siteInfoProvider.isSecure();
     }
 
     private String getPortIdentifier(final TransferDirection direction) throws 
IOException {
@@ -76,9 +74,9 @@ public class SocketClient implements SiteToSiteClient {
 
         final String portId;
         if (direction == TransferDirection.SEND) {
-            portId = pool.getInputPortIdentifier(this.portName);
+            portId = siteInfoProvider.getInputPortIdentifier(this.portName);
         } else {
-            portId = pool.getOutputPortIdentifier(this.portName);
+            portId = siteInfoProvider.getOutputPortIdentifier(this.portName);
         }
 
         if (portId == null) {
@@ -142,7 +140,7 @@ public class SocketClient implements SiteToSiteClient {
 
         // Wrap the transaction in a new one that will return the 
EndpointConnectionState back to the pool whenever
         // the transaction is either completed or canceled.
-        final ObjectHolder<EndpointConnection> connectionStateRef = new 
ObjectHolder<>(connectionState);
+        final AtomicReference<EndpointConnection> connectionStateRef = new 
AtomicReference<>(connectionState);
         return new Transaction() {
             @Override
             public void confirm() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
index 6ca5812..61df4cf 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java
@@ -20,6 +20,8 @@ public class AdaptedNodeInformation {
 
     private String hostname;
     private Integer siteToSitePort;
+    private Integer siteToSiteHttpApiPort;
+
     private int apiPort;
     private boolean isSiteToSiteSecure;
     private int totalFlowFiles;
@@ -63,4 +65,13 @@ public class AdaptedNodeInformation {
     public void setTotalFlowFiles(int totalFlowFiles) {
         this.totalFlowFiles = totalFlowFiles;
     }
+
+    public Integer getSiteToSiteHttpApiPort() {
+        return siteToSiteHttpApiPort;
+    }
+
+    public void setSiteToSiteHttpApiPort(Integer siteToSiteHttpApiPort) {
+        this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
index abfcc85..c348e13 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
@@ -20,14 +20,16 @@ public class NodeInformation {
 
     private final String siteToSiteHostname;
     private final Integer siteToSitePort;
+    private final Integer siteToSiteHttpApiPort;
     private final int apiPort;
     private final boolean isSiteToSiteSecure;
     private final int totalFlowFiles;
 
-    public NodeInformation(final String siteToSiteHostname, final Integer 
siteToSitePort, final int apiPort,
-            final boolean isSiteToSiteSecure, final int totalFlowFiles) {
+    public NodeInformation(final String siteToSiteHostname, final Integer 
siteToSitePort, final Integer siteToSiteHttpApiPort,
+            final int apiPort, final boolean isSiteToSiteSecure, final int 
totalFlowFiles) {
         this.siteToSiteHostname = siteToSiteHostname;
         this.siteToSitePort = siteToSitePort;
+        this.siteToSiteHttpApiPort = siteToSiteHttpApiPort;
         this.apiPort = apiPort;
         this.isSiteToSiteSecure = isSiteToSiteSecure;
         this.totalFlowFiles = totalFlowFiles;
@@ -45,6 +47,10 @@ public class NodeInformation {
         return siteToSitePort;
     }
 
+    public Integer getSiteToSiteHttpApiPort() {
+        return siteToSiteHttpApiPort;
+    }
+
     public boolean isSiteToSiteSecure() {
         return isSiteToSiteSecure;
     }
@@ -77,6 +83,16 @@ public class NodeInformation {
         } else if (siteToSitePort != null && siteToSitePort.intValue() != 
other.siteToSitePort.intValue()) {
             return false;
         }
+
+        if (siteToSiteHttpApiPort == null && other.siteToSiteHttpApiPort != 
null) {
+            return false;
+        }
+        if (siteToSiteHttpApiPort != null && other.siteToSiteHttpApiPort == 
null) {
+            return false;
+        } else if (siteToSiteHttpApiPort != null && 
siteToSiteHttpApiPort.intValue() != other.siteToSiteHttpApiPort.intValue()) {
+            return false;
+        }
+
         if (apiPort != other.apiPort) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
index b2dead0..c17849e 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
@@ -24,7 +24,8 @@ public class NodeInformationAdapter extends 
XmlAdapter<AdaptedNodeInformation, N
 
     @Override
     public NodeInformation unmarshal(final AdaptedNodeInformation adapted) 
throws Exception {
-        return new NodeInformation(adapted.getHostname(), 
adapted.getSiteToSitePort(), adapted.getApiPort(), 
adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles());
+        return new NodeInformation(adapted.getHostname(), 
adapted.getSiteToSitePort(), adapted.getSiteToSiteHttpApiPort(),
+                adapted.getApiPort(), adapted.isSiteToSiteSecure(), 
adapted.getTotalFlowFiles());
     }
 
     @Override
@@ -32,6 +33,7 @@ public class NodeInformationAdapter extends 
XmlAdapter<AdaptedNodeInformation, N
         final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
         adapted.setHostname(nodeInformation.getSiteToSiteHostname());
         adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
+        
adapted.setSiteToSiteHttpApiPort(nodeInformation.getSiteToSiteHttpApiPort());
         adapted.setApiPort(nodeInformation.getAPIPort());
         adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());
         adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles());

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
index 198aaef..ec20e50 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.remote.exception;
 
+import org.apache.nifi.remote.protocol.ResponseCode;
+
 import java.io.IOException;
 
 /**
@@ -28,11 +30,24 @@ public class HandshakeException extends IOException {
 
     private static final long serialVersionUID = 178192341908726L;
 
+    private final ResponseCode responseCode;
+
     public HandshakeException(final String message) {
         super(message);
+        this.responseCode = null;
     }
 
     public HandshakeException(final Throwable cause) {
         super(cause);
+        this.responseCode = null;
+    }
+
+    public HandshakeException(final ResponseCode responseCode, final String 
message) {
+        super(message);
+        this.responseCode = responseCode;
+    }
+
+    public ResponseCode getResponseCode() {
+        return responseCode;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
new file mode 100644
index 0000000..d561833
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpCommunicationsSession.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.http;
+
+import org.apache.nifi.remote.AbstractCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+import java.io.IOException;
+
+public class HttpCommunicationsSession extends AbstractCommunicationsSession {
+
+    protected int timeout = 30000;
+
+    protected final HttpInput input;
+    protected final HttpOutput output;
+    protected String checksum;
+
+    public HttpCommunicationsSession(){
+        super(null);
+        this.input = new HttpInput();
+        this.output = new HttpOutput();
+    }
+
+    @Override
+    public void setTimeout(final int millis) throws IOException {
+        this.timeout = millis;
+    }
+
+    @Override
+    public int getTimeout() throws IOException {
+        return timeout;
+    }
+
+    @Override
+    public CommunicationsInput getInput() {
+        return input;
+    }
+
+    @Override
+    public CommunicationsOutput getOutput() {
+        return output;
+    }
+
+    @Override
+    public boolean isDataAvailable() {
+        return false;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        return output.getBytesWritten();
+    }
+
+    @Override
+    public long getBytesRead() {
+        return input.getBytesRead();
+    }
+
+    @Override
+    public void interrupt() {
+    }
+
+    @Override
+    public boolean isClosed() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    public String getChecksum() {
+        return checksum;
+    }
+
+    public void setChecksum(String checksum) {
+        this.checksum = checksum;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
new file mode 100644
index 0000000..5048306
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpInput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.http;
+
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class HttpInput implements CommunicationsInput {
+
+    private ByteCountingInputStream countingIn;
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+        return countingIn;
+    }
+
+    @Override
+    public long getBytesRead() {
+        if (countingIn != null) {
+            return countingIn.getBytesRead();
+        }
+        return 0L;
+    }
+
+    @Override
+    public void consume() throws IOException {
+        if (countingIn == null) {
+            return;
+        }
+
+        final byte[] b = new byte[4096];
+        int bytesRead;
+        do {
+            bytesRead = countingIn.read(b);
+        } while (bytesRead > 0);
+    }
+
+    public void setInputStream(InputStream inputStream) {
+        this.countingIn = new ByteCountingInputStream(inputStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
new file mode 100644
index 0000000..b78be18
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpOutput.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.http;
+
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class HttpOutput implements CommunicationsOutput {
+
+    private ByteCountingOutputStream countingOut;
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return countingOut;
+    }
+
+    @Override
+    public long getBytesWritten() {
+        if (countingOut != null) {
+            return countingOut.getBytesWritten();
+        }
+        return 0L;
+    }
+
+    public void setOutputStream(OutputStream outputStream) {
+        this.countingOut = new ByteCountingOutputStream(outputStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
new file mode 100644
index 0000000..ae12c67
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/http/HttpServerCommunicationsSession.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.http;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.protocol.ResponseCode;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpServerCommunicationsSession extends HttpCommunicationsSession 
{
+
+    private final Map<String, String> handshakeParams = new HashMap<>();
+    private final String transactionId;
+    private Transaction.TransactionState status = 
Transaction.TransactionState.TRANSACTION_STARTED;
+    private ResponseCode responseCode;
+
+    public HttpServerCommunicationsSession(InputStream inputStream, 
OutputStream outputStream, String transactionId){
+        super();
+        input.setInputStream(inputStream);
+        output.setOutputStream(outputStream);
+        this.transactionId = transactionId;
+    }
+
+    // This status is only needed by HttpFlowFileServerProtocol, 
HttpClientTransaction has its own status.
+    // Because multiple HttpFlowFileServerProtocol instances have to carry on 
a single transaction
+    // throughout multiple HTTP requests, status has to be embedded here.
+    public Transaction.TransactionState getStatus() {
+        return status;
+    }
+
+    public void setStatus(Transaction.TransactionState status) {
+        this.status = status;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public ResponseCode getResponseCode() {
+        return responseCode;
+    }
+
+    public void setResponseCode(ResponseCode responseCode) {
+        this.responseCode = responseCode;
+    }
+
+    public void putHandshakeParam(HandshakeProperty key, String value) {
+        handshakeParams.put(key.name(), value);
+    }
+
+    public Map<String, String> getHandshakeParams() {
+        return handshakeParams;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 2efea11..1f21faf 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -16,11 +16,6 @@
  */
 package org.apache.nifi.remote.protocol;
 
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.Transaction;
@@ -32,6 +27,9 @@ import 
org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 
+import java.io.IOException;
+import java.util.Set;
+
 public interface ClientProtocol extends VersionedRemoteResource {
 
     void handshake(Peer peer) throws IOException, HandshakeException, 
UnknownPortException, PortNotRunningException;
@@ -40,14 +38,8 @@ public interface ClientProtocol extends 
VersionedRemoteResource {
 
     FlowFileCodec negotiateCodec(Peer peer) throws IOException, 
ProtocolException;
 
-    int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
-
-    int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
-
     void shutdown(Peer peer) throws IOException, ProtocolException;
 
-    boolean isReadyForFileTransfer();
-
     Transaction startTransaction(Peer peer, FlowFileCodec codec, 
TransferDirection direction) throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java
new file mode 100644
index 0000000..34274ae
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/HandshakeProperty.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket
+ * Protocol.
+ */
+public enum HandshakeProperty {
+
+    /**
+     * Boolean value indicating whether or not the contents of a FlowFile 
should
+     * be GZipped when transferred.
+     */
+    GZIP,
+    /**
+     * The unique identifier of the port to communicate with
+     */
+    PORT_IDENTIFIER,
+    /**
+     * Indicates the number of milliseconds after the request was made that the
+     * client will wait for a response. If no response has been received by the
+     * time this value expires, the server can move on without attempting to
+     * service the request because the client will have already disconnected.
+     */
+    REQUEST_EXPIRATION_MILLIS,
+    /**
+     * The preferred number of FlowFiles that the server should send to the
+     * client when pulling data. This property was introduced in version 5 of
+     * the protocol.
+     */
+    BATCH_COUNT,
+    /**
+     * The preferred number of bytes that the server should send to the client
+     * when pulling data. This property was introduced in version 5 of the
+     * protocol.
+     */
+    BATCH_SIZE,
+    /**
+     * The preferred amount of time that the server should send data to the
+     * client when pulling data. This property was introduced in version 5 of
+     * the protocol. Value is in milliseconds.
+     */
+    BATCH_DURATION;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java
new file mode 100644
index 0000000..0dcd1d0
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/Response.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class Response {
+
+    private final ResponseCode code;
+    private final String message;
+
+    private Response(final ResponseCode code, final String explanation) {
+        this.code = code;
+        this.message = explanation;
+    }
+
+    public ResponseCode getCode() {
+        return code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public static Response read(final DataInputStream in) throws IOException, 
ProtocolException {
+        final ResponseCode code = ResponseCode.readCode(in);
+        final String message = code.containsMessage() ? in.readUTF() : null;
+        return new Response(code, message);
+    }
+
+    @Override
+    public String toString() {
+        return code + ": " + message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
new file mode 100644
index 0000000..18594e7
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ResponseCode.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+public enum ResponseCode {
+
+    RESERVED(0, "Reserved for Future Use", false), // This will likely be used 
if we ever need to expand the length of
+    // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
+    // handshaking properties
+    PROPERTIES_OK(1, "Properties OK", false),
+    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
+    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
+    MISSING_PROPERTY(232, "Missing Property", true),
+    // transaction indicators
+    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
+    FINISH_TRANSACTION(11, "Finish Transaction", false),
+    CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of 
this code is the checksum
+    TRANSACTION_FINISHED(13, "Transaction Finished", false),
+    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But 
Destination is Full", false),
+    CANCEL_TRANSACTION(15, "Cancel Transaction", true),
+    BAD_CHECKSUM(19, "Bad Checksum", false),
+    // data availability indicators
+    MORE_DATA(20, "More Data Exists", false),
+    NO_MORE_DATA(21, "No More Data Exists", false),
+    // port state indicators
+    UNKNOWN_PORT(200, "Unknown Port", false),
+    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
+    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
+    // authorization
+    UNAUTHORIZED(240, "User Not Authorized", true),
+    // error indicators
+    ABORT(250, "Abort", true),
+    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
+    END_OF_STREAM(255, "End of Stream", false);
+
+    private static final ResponseCode[] codeArray = new ResponseCode[256];
+
+    static {
+        for (final ResponseCode responseCode : ResponseCode.values()) {
+            codeArray[responseCode.getCode()] = responseCode;
+        }
+    }
+
+    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
+    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
+    private final int code;
+    private final byte[] codeSequence;
+    private final String description;
+    private final boolean containsMessage;
+
+    private ResponseCode(final int code, final String description, final 
boolean containsMessage) {
+        this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, (byte) code};
+        this.code = code;
+        this.description = description;
+        this.containsMessage = containsMessage;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public byte[] getCodeSequence() {
+        return codeSequence;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+
+    public boolean containsMessage() {
+        return containsMessage;
+    }
+
+    public void writeResponse(final DataOutputStream out) throws IOException {
+        if (containsMessage()) {
+            throw new IllegalArgumentException("ResponseCode " + code + " 
expects an explanation");
+        }
+
+        out.write(getCodeSequence());
+        out.flush();
+    }
+
+    public void writeResponse(final DataOutputStream out, final String 
explanation) throws IOException {
+        if (!containsMessage()) {
+            throw new IllegalArgumentException("ResponseCode " + code + " does 
not expect an explanation");
+        }
+
+        out.write(getCodeSequence());
+        out.writeUTF(explanation);
+        out.flush();
+    }
+
+    static ResponseCode readCode(final InputStream in) throws IOException, 
ProtocolException {
+        final int byte1 = in.read();
+        if (byte1 < 0) {
+            throw new EOFException();
+        } else if (byte1 != CODE_SEQUENCE_VALUE_1) {
+            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
+        }
+
+        final int byte2 = in.read();
+        if (byte2 < 0) {
+            throw new EOFException();
+        } else if (byte2 != CODE_SEQUENCE_VALUE_2) {
+            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
+        }
+
+        final int byte3 = in.read();
+        if (byte3 < 0) {
+            throw new EOFException();
+        }
+
+        final ResponseCode responseCode = codeArray[byte3];
+        if (responseCode == null) {
+            throw new ProtocolException("Received Response Code of " + byte3 + 
" but do not recognize this code");
+        }
+        return responseCode;
+    }
+
+    public static ResponseCode fromSequence(final byte[] value) {
+        final int code = value[3] & 0xFF;
+        return fromCode(code);
+    }
+
+    public static ResponseCode fromCode(final int code) {
+        final ResponseCode responseCode = codeArray[code];
+        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : 
responseCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java
new file mode 100644
index 0000000..b67506d
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/SiteToSiteTransportProtocol.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+public enum SiteToSiteTransportProtocol {
+    RAW,
+    HTTP
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
new file mode 100644
index 0000000..d4085ca
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpClientTransaction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.AbstractTransaction;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class HttpClientTransaction extends AbstractTransaction {
+
+    private SiteToSiteRestApiClient apiClient;
+    private String transactionUrl;
+
+    public HttpClientTransaction(final int protocolVersion, final Peer peer, 
TransferDirection direction,
+                                 final boolean useCompression, final String 
portId, int penaltyMillis, EventReporter eventReporter) throws IOException {
+        super(peer, direction, useCompression, new StandardFlowFileCodec(), 
eventReporter, protocolVersion, penaltyMillis, portId);
+    }
+
+    public void initialize(SiteToSiteRestApiClient apiUtil, String 
transactionUrl) throws IOException {
+        this.transactionUrl = transactionUrl;
+        this.apiClient = apiUtil;
+        if(TransferDirection.RECEIVE.equals(direction)){
+            dataAvailable = apiUtil.openConnectionForReceive(transactionUrl, 
peer.getCommunicationsSession());
+        } else {
+            apiUtil.openConnectionForSend(transactionUrl, 
peer.getCommunicationsSession());
+        }
+    }
+
+    @Override
+    protected Response readTransactionResponse() throws IOException {
+        HttpCommunicationsSession commSession = (HttpCommunicationsSession) 
peer.getCommunicationsSession();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        if(TransferDirection.RECEIVE.equals(direction)){
+            switch (state){
+                case TRANSACTION_STARTED:
+                case DATA_EXCHANGED:
+                    logger.debug("{} {} readTransactionResponse. checksum={}", 
this, peer, commSession.getChecksum());
+                    if(StringUtils.isEmpty(commSession.getChecksum())){
+                        // We don't know if there's more data to receive, so 
just continue it.
+                        ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+                    } else {
+                        // We got a checksum to send to server.
+                        if 
(TransactionState.TRANSACTION_STARTED.equals(state)) {
+                            logger.debug("{} {} There's no transaction to 
confirm.", this, peer);
+                            
ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+                        } else {
+                            TransactionResultEntity transactionResult
+                                    = 
apiClient.commitReceivingFlowFiles(transactionUrl, 
ResponseCode.CONFIRM_TRANSACTION, commSession.getChecksum());
+                            ResponseCode responseCode = 
ResponseCode.fromCode(transactionResult.getResponseCode());
+                            if(responseCode.containsMessage()){
+                                String message = 
transactionResult.getMessage();
+                                responseCode.writeResponse(dos, message == 
null ? "" : message);
+                            } else {
+                                responseCode.writeResponse(dos);
+                            }
+                        }
+                    }
+                    break;
+            }
+        } else {
+            switch (state){
+                case DATA_EXCHANGED:
+                    // Some flow files have been sent via stream, finish 
transferring.
+                    apiClient.finishTransferFlowFiles(commSession);
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, 
commSession.getChecksum());
+                    break;
+                case TRANSACTION_CONFIRMED:
+                    TransactionResultEntity resultEntity = 
apiClient.commitTransferFlowFiles(transactionUrl, 
ResponseCode.CONFIRM_TRANSACTION);
+                    ResponseCode responseCode = 
ResponseCode.fromCode(resultEntity.getResponseCode());
+                    if(responseCode.containsMessage()){
+                        responseCode.writeResponse(dos, 
resultEntity.getMessage());
+                    } else {
+                        responseCode.writeResponse(dos);
+                    }
+                    break;
+            }
+        }
+        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+        return Response.read(new DataInputStream(bis));
+    }
+
+    @Override
+    protected void writeTransactionResponse(ResponseCode response, String 
explanation) throws IOException {
+        HttpCommunicationsSession commSession = (HttpCommunicationsSession) 
peer.getCommunicationsSession();
+        if(TransferDirection.RECEIVE.equals(direction)){
+            switch (response) {
+                case CONFIRM_TRANSACTION:
+                    logger.debug("{} Confirming transaction. checksum={}", 
this, explanation);
+                    commSession.setChecksum(explanation);
+                    break;
+                case TRANSACTION_FINISHED:
+                    logger.debug("{} Finishing transaction.", this);
+                    break;
+                case CANCEL_TRANSACTION:
+                    logger.debug("{} Canceling transaction. explanation={}", 
this, explanation);
+                    TransactionResultEntity resultEntity = 
apiClient.commitReceivingFlowFiles(transactionUrl, 
ResponseCode.CANCEL_TRANSACTION, null);
+                    ResponseCode cancelResponse = 
ResponseCode.fromCode(resultEntity.getResponseCode());
+                    switch (cancelResponse) {
+                        case CANCEL_TRANSACTION:
+                            logger.debug("{} CANCEL_TRANSACTION, The 
transaction is canceled on server properly.", this);
+                            break;
+                        default:
+                            logger.warn("{} CANCEL_TRANSACTION, Expected the 
transaction is canceled on server, but received {}.", this, cancelResponse);
+                            break;
+                    }
+                    break;
+            }
+        } else {
+            switch (response) {
+                case FINISH_TRANSACTION:
+                    // The actual HTTP request will be sent in 
readTransactionResponse.
+                    logger.debug("{} Finished sending flow files.", this);
+                    break;
+                case BAD_CHECKSUM: {
+                        TransactionResultEntity resultEntity = 
apiClient.commitTransferFlowFiles(transactionUrl, ResponseCode.BAD_CHECKSUM);
+                        ResponseCode badChecksumCancelResponse = 
ResponseCode.fromCode(resultEntity.getResponseCode());
+                        switch (badChecksumCancelResponse) {
+                            case CANCEL_TRANSACTION:
+                                logger.debug("{} BAD_CHECKSUM, The transaction 
is canceled on server properly.", this);
+                                break;
+                            default:
+                                logger.warn("{} BAD_CHECKSUM, Expected the 
transaction is canceled on server, but received {}.", this, 
badChecksumCancelResponse);
+                                break;
+                        }
+
+                    }
+                    break;
+                case CONFIRM_TRANSACTION:
+                    // The actual HTTP request will be sent in 
readTransactionResponse.
+                    logger.debug("{} Transaction is confirmed.", this);
+                    break;
+                case CANCEL_TRANSACTION: {
+                        logger.debug("{} Canceling transaction.", this);
+                        TransactionResultEntity resultEntity = 
apiClient.commitTransferFlowFiles(transactionUrl, 
ResponseCode.CANCEL_TRANSACTION);
+                        ResponseCode cancelResponse = 
ResponseCode.fromCode(resultEntity.getResponseCode());
+                        switch (cancelResponse) {
+                            case CANCEL_TRANSACTION:
+                                logger.debug("{} CANCEL_TRANSACTION, The 
transaction is canceled on server properly.", this);
+                                break;
+                            default:
+                                logger.warn("{} CANCEL_TRANSACTION, Expected 
the transaction is canceled on server, but received {}.", this, cancelResponse);
+                                break;
+                        }
+                    }
+                    break;
+            }
+        }
+    }
+
+
+    @Override
+    protected void close() throws IOException {
+        if (apiClient != null) {
+            apiClient.close();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java
new file mode 100644
index 0000000..d15b335
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpHeaders.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+public class HttpHeaders {
+
+    public static final String LOCATION_HEADER_NAME = "Location";
+    public static final String LOCATION_URI_INTENT_NAME = 
"x-location-uri-intent";
+    public static final String LOCATION_URI_INTENT_VALUE = "transaction-url";
+
+    public static final String ACCEPT_ENCODING = "Accept-Encoding";
+    public static final String CONTENT_ENCODING = "Content-Encoding";
+    public static final String PROTOCOL_VERSION = 
"x-nifi-site-to-site-protocol-version";
+    public static final String SERVER_SIDE_TRANSACTION_TTL = 
"x-nifi-site-to-site-server-transaction-ttl";
+    public static final String HANDSHAKE_PROPERTY_USE_COMPRESSION = 
"x-nifi-site-to-site-use-compression";
+    public static final String HANDSHAKE_PROPERTY_REQUEST_EXPIRATION = 
"x-nifi-site-to-site-request-expiration";
+    public static final String HANDSHAKE_PROPERTY_BATCH_COUNT = 
"x-nifi-site-to-site-batch-count";
+    public static final String HANDSHAKE_PROPERTY_BATCH_SIZE = 
"x-nifi-site-to-site-batch-size";
+    public static final String HANDSHAKE_PROPERTY_BATCH_DURATION = 
"x-nifi-site-to-site-batch-duration";
+
+}

Reply via email to