http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 809147e..a5d4bbe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -16,6 +16,19 @@
  */
 package org.apache.nifi.remote;
 
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,24 +43,9 @@ import java.net.SocketTimeoutException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.remote.cluster.NodeInformant;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.ServerProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class SocketRemoteSiteListener implements RemoteSiteListener {
 
     public static final String DEFAULT_FLOWFILE_PATH = "./";
@@ -261,11 +259,11 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                                 break;
                                             case RECEIVE_FLOWFILES:
                                                 // peer wants to receive 
FlowFiles, so we will transfer FlowFiles.
-                                                
protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, 
String>());
+                                                
protocol.getPort().transferFlowFiles(peer, protocol);
                                                 break;
                                             case SEND_FLOWFILES:
                                                 // Peer wants to send 
FlowFiles, so we will receive.
-                                                
protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, 
String>());
+                                                
protocol.getPort().receiveFlowFiles(peer, protocol);
                                                 break;
                                             case REQUEST_PEER_LIST:
                                                 protocol.sendPeerList(peer);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 02a44b7..3f59b50 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -47,6 +47,7 @@ import 
org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -137,10 +138,13 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             .url(remoteGroup.getTargetUri().toString())
             .portIdentifier(getIdentifier())
             .sslContext(sslContext)
+            .useCompression(isUseCompression())
             .eventReporter(remoteGroup.getEventReporter())
             .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
             .nodePenalizationPeriod(penalizationMillis, TimeUnit.MILLISECONDS)
             
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS)
+            .transportProtocol(remoteGroup.getTransportProtocol())
+            .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), 
remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), 
remoteGroup.getProxyPassword()))
             .build();
         clientRef.set(client);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 7507935..58f4804 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -50,7 +50,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -236,7 +235,8 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
             return;
         }
 
-        session.commit();
+        // TODO: Comfirm this. Session.commit here is not required since it 
has been committed inside receiveFlowFiles/transferFlowFiles.
+        // session.commit();
         responseQueue.add(new ProcessingResult(transferCount));
     }
 
@@ -451,7 +451,7 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
     }
 
     @Override
-    public int receiveFlowFiles(final Peer peer, final ServerProtocol 
serverProtocol, final Map<String, String> requestHeaders)
+    public int receiveFlowFiles(final Peer peer, final ServerProtocol 
serverProtocol)
             throws NotAuthorizedException, BadRequestException, 
RequestExpiredException {
         if (getConnectableType() != ConnectableType.INPUT_PORT) {
             throw new IllegalStateException("Cannot receive FlowFiles because 
this port is not an Input Port");
@@ -505,7 +505,7 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
     }
 
     @Override
-    public int transferFlowFiles(final Peer peer, final ServerProtocol 
serverProtocol, final Map<String, String> requestHeaders)
+    public int transferFlowFiles(final Peer peer, final ServerProtocol 
serverProtocol)
             throws NotAuthorizedException, BadRequestException, 
RequestExpiredException {
         if (getConnectableType() != ConnectableType.OUTPUT_PORT) {
             throw new IllegalStateException("Cannot send FlowFiles because 
this port is not an Output Port");

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
new file mode 100644
index 0000000..43428e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PortAuthorizationResult;
+import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+public abstract class AbstractFlowFileServerProtocol implements ServerProtocol 
{
+
+    protected ProcessGroup rootGroup;
+    protected RootGroupPort port;
+
+    protected boolean handshakeCompleted;
+    protected boolean shutdown = false;
+    protected FlowFileCodec negotiatedFlowFileCodec = null;
+
+    protected HandshakenProperties handshakenProperties;
+
+    protected static final long DEFAULT_BATCH_NANOS = 
TimeUnit.SECONDS.toNanos(5L);
+
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public void setRootProcessGroup(final ProcessGroup group) {
+        if (!group.isRootGroup()) {
+            throw new IllegalArgumentException("Specified group was not a root 
group.");
+        }
+        this.rootGroup = group;
+    }
+
+    @Override
+    public boolean isHandshakeSuccessful() {
+        return handshakeCompleted;
+    }
+
+    protected void validateHandshakeRequest(HandshakenProperties confirmed, 
final Peer peer, final Map<String, String> properties) throws 
HandshakeException {
+        Boolean useGzip = null;
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String propertyName = entry.getKey();
+            final String value = entry.getValue();
+
+            final HandshakeProperty property;
+            try {
+                property = HandshakeProperty.valueOf(propertyName);
+            } catch (final Exception e) {
+                throw new 
HandshakeException(ResponseCode.UNKNOWN_PROPERTY_NAME, "Received unknown 
property: " + propertyName);
+            }
+
+            try {
+                switch (property) {
+                    case GZIP: {
+                        useGzip = Boolean.parseBoolean(value);
+                        confirmed.setUseGzip(useGzip);
+                        break;
+                    }
+                    case REQUEST_EXPIRATION_MILLIS:
+                        confirmed.setExpirationMillis(Long.parseLong(value));
+                        break;
+                    case BATCH_COUNT:
+                        confirmed.setBatchCount(Integer.parseInt(value));
+                        break;
+                    case BATCH_SIZE:
+                        confirmed.setBatchBytes(Long.parseLong(value));
+                        break;
+                    case BATCH_DURATION:
+                        
confirmed.setBatchDurationNanos(TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)));
+                        break;
+                    case PORT_IDENTIFIER: {
+                        checkPortStatus(peer, value);
+                    }
+                }
+            } catch (final NumberFormatException nfe) {
+                throw new 
HandshakeException(ResponseCode.ILLEGAL_PROPERTY_VALUE, "Received invalid value 
for property '" + property + "'; invalid value: " + value);
+            }
+        }
+
+        if (useGzip == null) {
+            logger.debug("Responding with ResponseCode MISSING_PROPERTY 
because GZIP Property missing");
+            throw new HandshakeException(ResponseCode.MISSING_PROPERTY, 
"Missing Property " + HandshakeProperty.GZIP.name());
+        }
+
+    }
+
+    protected void checkPortStatus(final Peer peer, String portId) throws 
HandshakeException {
+        Port receivedPort = rootGroup.getInputPort(portId);
+        if (receivedPort == null) {
+            receivedPort = rootGroup.getOutputPort(portId);
+        }
+        if (receivedPort == null) {
+            logger.debug("Responding with ResponseCode UNKNOWN_PORT for 
identifier {}", portId);
+            throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received 
unknown port identifier: " + portId);
+        }
+        if (!(receivedPort instanceof RootGroupPort)) {
+            logger.debug("Responding with ResponseCode UNKNOWN_PORT for 
identifier {}", portId);
+            throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received 
port identifier " + portId + ", but this Port is not a RootGroupPort");
+        }
+
+        this.port = (RootGroupPort) receivedPort;
+        final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+        if (!portAuthResult.isAuthorized()) {
+            logger.debug("Responding with ResponseCode UNAUTHORIZED: ", 
portAuthResult.getExplanation());
+            throw new HandshakeException(ResponseCode.UNAUTHORIZED, 
portAuthResult.getExplanation());
+        }
+
+        if (!receivedPort.isValid()) {
+            logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE 
for {}", receivedPort);
+            throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, 
"Port is not valid");
+        }
+
+        if (!receivedPort.isRunning()) {
+            logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE 
for {}", receivedPort);
+            throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, 
"Port not running");
+        }
+
+        // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, 
just ignore this
+        // we we will simply not service the request but the sender will 
timeout
+        if (getVersionNegotiator().getVersion() > 1) {
+            for (final Connection connection : port.getConnections()) {
+                if (connection.getFlowFileQueue().isFull()) {
+                    logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", port);
+                    throw new 
HandshakeException(ResponseCode.PORTS_DESTINATION_FULL, "Received port 
identifier " + portId + ", but its destination is full");
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public RootGroupPort getPort() {
+        return port;
+    }
+
+    @Override
+    public FlowFileCodec getPreNegotiatedCodec() {
+        return negotiatedFlowFileCodec;
+    }
+
+    @Override
+    public final void handshake(final Peer peer) throws IOException, 
HandshakeException {
+        if (handshakeCompleted) {
+            throw new IllegalStateException("Handshake has already been 
completed");
+        }
+        if (shutdown) {
+            throw new IllegalStateException("Protocol is shutdown");
+        }
+
+        logger.debug("{} Handshaking with {}", this, peer);
+
+        this.handshakenProperties = doHandshake(peer);
+
+        logger.debug("{} Finished handshake with {}", this, peer);
+        handshakeCompleted = true;
+    }
+
+    abstract protected HandshakenProperties doHandshake(final Peer peer) 
throws  IOException, HandshakeException;
+
+    @Override
+    public int transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+        if (!handshakeCompleted) {
+            throw new IllegalStateException("Handshake has not been 
completed");
+        }
+        if (shutdown) {
+            throw new IllegalStateException("Protocol is shutdown");
+        }
+
+        logger.debug("{} Sending FlowFiles to {}", this, peer);
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        String remoteDn = commsSession.getUserDn();
+        if (remoteDn == null) {
+            remoteDn = "none";
+        }
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            // we have no data to send. Notify the peer.
+            logger.debug("{} No data to send to {}", this, peer);
+            writeTransactionResponse(true, ResponseCode.NO_MORE_DATA, 
commsSession);
+            return 0;
+        }
+
+        // we have data to send.
+        logger.debug("{} Data is available to send to {}", this, peer);
+        writeTransactionResponse(true, ResponseCode.MORE_DATA, commsSession);
+
+        final StopWatch stopWatch = new StopWatch(true);
+        long bytesSent = 0L;
+        final Set<FlowFile> flowFilesSent = new HashSet<>();
+        final CRC32 crc = new CRC32();
+
+        // send data until we reach some batch size
+        boolean continueTransaction = true;
+        final long startNanos = System.nanoTime();
+        String calculatedCRC = "";
+        OutputStream os = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        while (continueTransaction) {
+            final boolean useGzip = handshakenProperties.isUseGzip();
+            final OutputStream flowFileOutputStream = useGzip ? new 
CompressionOutputStream(os) : os;
+            logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, 
peer});
+
+            final CheckedOutputStream checkedOutputStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
+
+            final StopWatch transferWatch = new StopWatch(true);
+
+            final FlowFile toSend = flowFile;
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    final DataPacket dataPacket = new 
StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+                    codec.encode(dataPacket, checkedOutputStream);
+                }
+            });
+
+            final long transmissionMillis = 
transferWatch.getElapsed(TimeUnit.MILLISECONDS);
+
+            // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
+            // Otherwise, do NOT close it because we don't want to close the 
underlying stream
+            // (CompressionOutputStream will not close the underlying stream 
when it's closed)
+            if (useGzip) {
+                checkedOutputStream.close();
+            }
+
+            flowFilesSent.add(flowFile);
+            bytesSent += flowFile.getSize();
+
+            String transitUriPrefix = 
handshakenProperties.getTransitUriPrefix();
+            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
+            session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
+            session.remove(flowFile);
+
+            // determine if we should check for more data on queue.
+            final long sendingNanos = System.nanoTime() - startNanos;
+            boolean poll = true;
+            double batchDurationNanos = 
handshakenProperties.getBatchDurationNanos();
+            if (sendingNanos >= batchDurationNanos && batchDurationNanos > 0L) 
{
+                poll = false;
+            }
+            double batchBytes = handshakenProperties.getBatchBytes();
+            if (bytesSent >= batchBytes && batchBytes > 0L) {
+                poll = false;
+            }
+            double batchCount = handshakenProperties.getBatchCount();
+            if (flowFilesSent.size() >= batchCount && batchCount > 0) {
+                poll = false;
+            }
+
+            if (batchDurationNanos == 0 && batchBytes == 0 && batchCount == 0) 
{
+                poll = (sendingNanos < DEFAULT_BATCH_NANOS);
+            }
+
+            if (poll) {
+                // we've not elapsed the requested sending duration, so get 
more data.
+                flowFile = session.get();
+            } else {
+                flowFile = null;
+            }
+
+            continueTransaction = (flowFile != null);
+            if (continueTransaction) {
+                logger.debug("{} Sending ContinueTransaction indicator to {}", 
this, peer);
+                writeTransactionResponse(true, 
ResponseCode.CONTINUE_TRANSACTION, commsSession);
+            } else {
+                logger.debug("{} Sending FinishTransaction indicator to {}", 
this, peer);
+                writeTransactionResponse(true, 
ResponseCode.FINISH_TRANSACTION, commsSession);
+                calculatedCRC = 
String.valueOf(checkedOutputStream.getChecksum().getValue());
+            }
+        }
+
+        FlowFileTransaction transaction = new FlowFileTransaction(session, 
context, stopWatch, bytesSent, flowFilesSent, calculatedCRC);
+        return commitTransferTransaction(peer, transaction);
+
+    }
+
+    protected int commitTransferTransaction(Peer peer, FlowFileTransaction 
transaction) throws IOException {
+        ProcessSession session = transaction.getSession();
+        Set<FlowFile> flowFilesSent = transaction.getFlowFilesSent();
+
+        // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
+        CommunicationsSession commsSession = peer.getCommunicationsSession();
+        final Response transactionConfirmationResponse = 
readTransactionResponse(true, commsSession);
+        if (transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION) {
+            // Confirm Checksum and echo back the confirmation.
+            logger.debug("{} Received {}  from {}", this, 
transactionConfirmationResponse, peer);
+            final String receivedCRC = 
transactionConfirmationResponse.getMessage();
+
+            if (getVersionNegotiator().getVersion() > 3) {
+                String calculatedCRC = transaction.getCalculatedCRC();
+                if (!receivedCRC.equals(calculatedCRC)) {
+                    writeTransactionResponse(true, ResponseCode.BAD_CHECKSUM, 
commsSession);
+                    session.rollback();
+                    throw new IOException(this + " Sent data to peer " + peer 
+ " but calculated CRC32 Checksum as "
+                            + calculatedCRC + " while peer calculated CRC32 
Checksum as " + receivedCRC
+                            + "; canceling transaction and rolling back 
session");
+                }
+            }
+
+            writeTransactionResponse(true, ResponseCode.CONFIRM_TRANSACTION, 
commsSession, "");
+
+        } else {
+            throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
+        }
+
+        final String flowFileDescription = flowFilesSent.size() < 20 ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+
+        final Response transactionResponse;
+        try {
+            transactionResponse = readTransactionResponse(true, commsSession);
+        } catch (final IOException e) {
+            logger.error("{} Failed to receive a response from {} when 
expecting a TransactionFinished Indicator."
+                    + " It is unknown whether or not the peer successfully 
received/processed the data."
+                    + " Therefore, {} will be rolled back, possibly resulting 
in data duplication of {}",
+                    this, peer, session, flowFileDescription);
+            session.rollback();
+            throw e;
+        }
+
+        logger.debug("{} received {} from {}", new Object[]{this, 
transactionResponse, peer});
+        if (transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
+            peer.penalize(port.getIdentifier(), 
port.getYieldPeriod(TimeUnit.MILLISECONDS));
+        } else if (transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED) {
+            throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
+        }
+
+        session.commit();
+
+        StopWatch stopWatch = transaction.getStopWatch();
+        long bytesSent = transaction.getBytesSent();
+        stopWatch.stop();
+        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+        final String dataSize = FormatUtils.formatDataSize(bytesSent);
+        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at 
a rate of {}", new Object[]{
+            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+
+        return flowFilesSent.size();
+    }
+
+    protected Response readTransactionResponse(boolean isTransfer, 
CommunicationsSession commsSession) throws IOException {
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        return Response.read(dis);
+    }
+
+    protected final void writeTransactionResponse(boolean isTransfer, 
ResponseCode response, CommunicationsSession commsSession) throws IOException {
+        writeTransactionResponse(isTransfer, response, commsSession, null);
+    }
+    protected void writeTransactionResponse(boolean isTransfer, ResponseCode 
response, CommunicationsSession commsSession, String explanation) throws 
IOException {
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        if(explanation == null){
+            response.writeResponse(dos);
+        } else {
+            response.writeResponse(dos, explanation);
+        }
+    }
+
+    @Override
+    public int receiveFlowFiles(final Peer peer, final ProcessContext context, 
final ProcessSession session, final FlowFileCodec codec) throws IOException, 
ProtocolException {
+        if (!handshakeCompleted) {
+            throw new IllegalStateException("Handshake has not been 
completed");
+        }
+        if (shutdown) {
+            throw new IllegalStateException("Protocol is shutdown");
+        }
+
+        logger.debug("{} receiving FlowFiles from {}", this, peer);
+
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        String remoteDn = commsSession.getUserDn();
+        if (remoteDn == null) {
+            remoteDn = "none";
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+        final CRC32 crc = new CRC32();
+
+        // Peer has data. Otherwise, we would not have been called, because 
they would not have sent
+        // a SEND_FLOWFILES request to use. Just decode the bytes into 
FlowFiles until peer says he's
+        // finished sending data.
+        final Set<FlowFile> flowFilesReceived = new HashSet<>();
+        long bytesReceived = 0L;
+        boolean continueTransaction = true;
+        while (continueTransaction) {
+            final long startNanos = System.nanoTime();
+            final InputStream flowFileInputStream = 
handshakenProperties.isUseGzip() ? new CompressionInputStream(dis) : dis;
+            final CheckedInputStream checkedInputStream = new 
CheckedInputStream(flowFileInputStream, crc);
+
+            final DataPacket dataPacket = codec.decode(checkedInputStream);
+            if(dataPacket == null){
+                logger.debug("{} Received null dataPacket indicating the end 
of transaction from {}", this, peer);
+                break;
+            }
+            FlowFile flowFile = session.create();
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
+
+            final long transferNanos = System.nanoTime() - startNanos;
+            final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+            final String sourceSystemFlowFileUuid = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+            flowFile = session.putAttribute(flowFile, 
CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+            String transitUriPrefix = 
handshakenProperties.getTransitUriPrefix();
+            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
+            session.getProvenanceReporter().receive(flowFile, transitUri, 
sourceSystemFlowFileUuid == null
+                    ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
+            session.transfer(flowFile, Relationship.ANONYMOUS);
+            flowFilesReceived.add(flowFile);
+            bytesReceived += flowFile.getSize();
+
+            final Response transactionResponse = 
readTransactionResponse(false, commsSession);
+            switch (transactionResponse.getCode()) {
+                case CONTINUE_TRANSACTION:
+                    logger.debug("{} Received ContinueTransaction indicator 
from {}", this, peer);
+                    break;
+                case FINISH_TRANSACTION:
+                    logger.debug("{} Received FinishTransaction indicator from 
{}", this, peer);
+                    continueTransaction = false;
+                    break;
+                case CANCEL_TRANSACTION:
+                    logger.info("{} Received CancelTransaction indicator from 
{} with explanation {}", this, peer, transactionResponse.getMessage());
+                    session.rollback();
+                    return 0;
+                default:
+                    throw new ProtocolException("Received unexpected response 
from peer: when expecting Continue Transaction or Finish Transaction, received" 
+ transactionResponse);
+            }
+        }
+
+        // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+        // to peer so that we can verify that the connection is still open. 
This is a two-phase commit,
+        // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
+        // session and then when we send the response back to the peer, the 
peer may have timed out and may not
+        // be listening. As a result, it will re-send the data. By doing this 
two-phase commit, we narrow the
+        // Critical Section involved in this transaction so that rather than 
the Critical Section being the
+        // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
+        logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
+        String calculatedCRC = String.valueOf(crc.getValue());
+        writeTransactionResponse(false, ResponseCode.CONFIRM_TRANSACTION, 
commsSession, calculatedCRC);
+
+        FlowFileTransaction transaction = new FlowFileTransaction(session, 
context, stopWatch, bytesReceived, flowFilesReceived, calculatedCRC);
+        return commitReceiveTransaction(peer, transaction);
+    }
+
+    protected int commitReceiveTransaction(Peer peer, FlowFileTransaction 
transaction) throws IOException {
+        CommunicationsSession commsSession = peer.getCommunicationsSession();
+        ProcessSession session = transaction.getSession();
+        final Response confirmTransactionResponse = 
readTransactionResponse(false, commsSession);
+        logger.debug("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
+
+        switch (confirmTransactionResponse.getCode()) {
+            case CONFIRM_TRANSACTION:
+                break;
+            case BAD_CHECKSUM:
+                session.rollback();
+                throw new IOException(this + " Received a BadChecksum response 
from peer " + peer);
+            default:
+                throw new ProtocolException(this + " Received unexpected 
Response Code from peer " + peer + " : " + confirmTransactionResponse + "; 
expected 'Confirm Transaction' Response Code");
+        }
+
+        // Commit the session so that we have persisted the data
+        session.commit();
+
+        if (transaction.getContext().getAvailableRelationships().isEmpty()) {
+            // Confirm that we received the data and the peer can now discard 
it but that the peer should not
+            // send any more data for a bit
+            logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);
+            writeTransactionResponse(false, 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL, commsSession);
+        } else {
+            // Confirm that we received the data and the peer can now discard 
it
+            logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+            writeTransactionResponse(false, ResponseCode.TRANSACTION_FINISHED, 
commsSession);
+        }
+
+        Set<FlowFile> flowFilesReceived = transaction.getFlowFilesSent();
+        long bytesReceived = transaction.getBytesSent();
+        StopWatch stopWatch = transaction.getStopWatch();
+        stopWatch.stop();
+        final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+        final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
+        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+        final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+        logger.info("{} Successfully received {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[]{
+                this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+
+        return flowFilesReceived.size();
+    }
+
+    @Override
+    public void shutdown(final Peer peer) {
+        logger.debug("{} Shutting down with {}", this, peer);
+        shutdown = true;
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return shutdown;
+    }
+
+    @Override
+    public void setNodeInformant(final NodeInformant nodeInformant) {
+    }
+
+    @Override
+    public long getRequestExpiration() {
+        return handshakenProperties.getExpirationMillis();
+    }
+
+    @Override
+    public String toString() {
+        String commid = handshakenProperties != null ? 
handshakenProperties.getCommsIdentifier() : null;
+        return getClass().getSimpleName() + "[CommsID=" + commid + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java
new file mode 100644
index 0000000..560cbaf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/FlowFileTransaction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.util.StopWatch;
+
+import java.util.Set;
+
+public class FlowFileTransaction {
+
+    private final ProcessSession session;
+    private final ProcessContext context;
+    private final StopWatch stopWatch;
+    private final long bytesSent;
+    private final Set<FlowFile> flowFilesSent;
+    private final String calculatedCRC;
+
+    public FlowFileTransaction() {
+        this(null, null, new StopWatch(true), 0, null,  null);
+    }
+
+    public FlowFileTransaction(ProcessSession session, ProcessContext context, 
StopWatch stopWatch, long bytesSent, Set<FlowFile> flowFilesSent, String 
calculatedCRC) {
+        this.session = session;
+        this.context = context;
+        this.stopWatch = stopWatch;
+        this.bytesSent = bytesSent;
+        this.flowFilesSent = flowFilesSent;
+        this.calculatedCRC = calculatedCRC;
+    }
+
+    public ProcessSession getSession() {
+        return session;
+    }
+
+    public StopWatch getStopWatch() {
+        return stopWatch;
+    }
+
+    public long getBytesSent() {
+        return bytesSent;
+    }
+
+    public Set<FlowFile> getFlowFilesSent() {
+        return flowFilesSent;
+    }
+
+    public String getCalculatedCRC() {
+        return calculatedCRC;
+    }
+
+    public ProcessContext getContext() {
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java
new file mode 100644
index 0000000..816689b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/HandshakenProperties.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import org.apache.nifi.remote.exception.HandshakeException;
+
+public class HandshakenProperties {
+
+    private String commsIdentifier;
+    private String transitUriPrefix = null;
+    private boolean useGzip;
+    private long expirationMillis;
+    private int batchCount = 0;
+    private long batchBytes = 0L;
+    private long batchDurationNanos = 0L;
+
+
+    public String getCommsIdentifier() {
+        return commsIdentifier;
+    }
+
+    public void setCommsIdentifier(String commsIdentifier) {
+        this.commsIdentifier = commsIdentifier;
+    }
+
+    public String getTransitUriPrefix() {
+        return transitUriPrefix;
+    }
+
+    public void setTransitUriPrefix(String transitUriPrefix) {
+        this.transitUriPrefix = transitUriPrefix;
+    }
+
+    public boolean isUseGzip() {
+        return useGzip;
+    }
+
+    public void setUseGzip(Boolean useGzip) {
+        this.useGzip = useGzip;
+    }
+
+    public long getExpirationMillis() {
+        return expirationMillis;
+    }
+
+    public void setExpirationMillis(long expirationMillis) {
+        this.expirationMillis = expirationMillis;
+    }
+
+    public int getBatchCount() {
+        return batchCount;
+    }
+
+    public void setBatchCount(int batchCount) throws HandshakeException {
+        if (batchCount < 0) {
+            throw new HandshakeException("Cannot request Batch Count less than 
1; requested value: " + batchCount);
+        }
+        this.batchCount = batchCount;
+    }
+
+    public long getBatchBytes() {
+        return batchBytes;
+    }
+
+    public void setBatchBytes(long batchBytes) throws HandshakeException {
+        if (batchBytes < 0) {
+            throw new HandshakeException("Cannot request Batch Size less than 
1; requested value: " + batchBytes);
+        }
+        this.batchBytes = batchBytes;
+    }
+
+    public long getBatchDurationNanos() {
+        return batchDurationNanos;
+    }
+
+    public void setBatchDurationNanos(long batchDurationNanos) throws 
HandshakeException {
+        if (batchDurationNanos < 0) {
+            throw new HandshakeException("Cannot request Batch Duration less 
than 1; requested value: " + batchDurationNanos);
+        }
+        this.batchDurationNanos = batchDurationNanos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java
new file mode 100644
index 0000000..69c0e66
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocol.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.protocol.ServerProtocol;
+
+import java.io.IOException;
+
+public interface HttpFlowFileServerProtocol extends ServerProtocol {
+
+    int commitTransferTransaction(Peer peer, String clientChecksum) throws 
IOException, IllegalStateException;
+
+    int commitReceiveTransaction(Peer peer) throws IOException, 
IllegalStateException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
new file mode 100644
index 0000000..f187625
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/HttpFlowFileServerProtocolImpl.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class HttpFlowFileServerProtocolImpl extends 
AbstractFlowFileServerProtocol implements HttpFlowFileServerProtocol {
+
+    public static final String RESOURCE_NAME = "HttpFlowFileProtocol";
+
+    private final FlowFileCodec codec = new StandardFlowFileCodec();
+    private final VersionNegotiator versionNegotiator;
+    private final HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
+
+    public HttpFlowFileServerProtocolImpl(VersionNegotiator versionNegotiator) 
{
+        super();
+        this.versionNegotiator = versionNegotiator;
+    }
+
+    @Override
+    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException {
+        return codec;
+    }
+
+    @Override
+    public FlowFileCodec getPreNegotiatedCodec() {
+        return codec;
+    }
+
+    @Override
+    protected HandshakenProperties doHandshake(Peer peer) throws IOException, 
HandshakeException {
+        HandshakenProperties confirmed = new HandshakenProperties();
+
+        HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        confirmed.setCommsIdentifier(commsSession.getTransactionId());
+        validateHandshakeRequest(confirmed, peer, 
commsSession.getHandshakeParams());
+
+        logger.debug("{} Done handshake, confirmed={}", this, confirmed);
+        return confirmed;
+    }
+
+    @Override
+    protected void writeTransactionResponse(boolean isTransfer, ResponseCode 
response, CommunicationsSession commsSession, String explanation) throws 
IOException {
+        HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) commsSession;
+
+        commSession.setResponseCode(response);
+        if(isTransfer){
+            switch (response) {
+                case NO_MORE_DATA:
+                    logger.debug("{} There's no data to send.", this);
+                    break;
+                case CONTINUE_TRANSACTION:
+                    logger.debug("{} Continue transaction... expecting more 
flow files.", this);
+                    
commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+                    break;
+                case BAD_CHECKSUM:
+                    logger.debug("{} Received BAD_CHECKSUM.", this);
+                    commSession.setStatus(Transaction.TransactionState.ERROR);
+                    break;
+                case CONFIRM_TRANSACTION:
+                    logger.debug("{} Transaction is confirmed.", this);
+                    
commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+                    break;
+                case FINISH_TRANSACTION:
+                    logger.debug("{} transaction is completed.", this);
+                    
commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+                    break;
+            }
+        } else {
+            switch (response) {
+                case CONFIRM_TRANSACTION:
+                    logger.debug("{} Confirming transaction. checksum={}", 
this, explanation);
+                    commSession.setChecksum(explanation);
+                    
commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+                    break;
+                case TRANSACTION_FINISHED:
+                case TRANSACTION_FINISHED_BUT_DESTINATION_FULL:
+                    logger.debug("{} Transaction is completed. 
responseCode={}", this, response);
+                    
commSession.setStatus(Transaction.TransactionState.TRANSACTION_COMPLETED);
+                    break;
+            }
+        }
+    }
+
+    @Override
+    protected Response readTransactionResponse(boolean isTransfer, 
CommunicationsSession commsSession) throws IOException {
+        // Returns Response based on current status.
+        HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) commsSession;
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        Transaction.TransactionState currentStatus = commSession.getStatus();
+        if(isTransfer){
+            switch (currentStatus){
+                case DATA_EXCHANGED:
+                    String clientChecksum = commSession.getChecksum();
+                    logger.debug("readTransactionResponse. clientChecksum={}", 
clientChecksum);
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(new 
DataOutputStream(bos), clientChecksum);
+                    break;
+                case TRANSACTION_CONFIRMED:
+                    logger.debug("readTransactionResponse. finishing.");
+                    ResponseCode.TRANSACTION_FINISHED.writeResponse(new 
DataOutputStream(bos));
+                    break;
+            }
+        } else {
+            switch (currentStatus){
+                case TRANSACTION_STARTED:
+                    logger.debug("readTransactionResponse. returning 
CONTINUE_TRANSACTION.");
+                    // We don't know if there's more data to receive, so just 
continue it.
+                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(new 
DataOutputStream(bos));
+                    break;
+                case TRANSACTION_CONFIRMED:
+                    // Checksum was successfully validated at client side, or 
BAD_CHECKSUM is returned.
+                    ResponseCode responseCode = commSession.getResponseCode();
+                    logger.debug("readTransactionResponse. responseCode={}", 
responseCode);
+                    if(responseCode.containsMessage()){
+                        responseCode.writeResponse(new DataOutputStream(bos), 
"");
+                    } else {
+                        responseCode.writeResponse(new DataOutputStream(bos));
+                    }
+                    break;
+            }
+        }
+
+        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+        return Response.read(new DataInputStream(bis));
+    }
+
+    private int holdTransaction(Peer peer, FlowFileTransaction transaction) {
+        // We don't commit the session here yet,
+        // to avoid losing sent flow files in case some issue happens at 
client side while it is processing,
+        // hold the transaction until we confirm additional request from 
client.
+        HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        logger.debug("{} Holding transaction. transactionId={}", this, 
transactionId);
+        transactionManager.holdTransaction(transactionId, transaction);
+
+        return transaction.getFlowFilesSent().size();
+    }
+
+    @Override
+    protected int commitTransferTransaction(Peer peer, FlowFileTransaction 
transaction) throws IOException {
+        return holdTransaction(peer, transaction);
+    }
+
+    public int commitTransferTransaction(Peer peer, String clientChecksum) 
throws IOException, IllegalStateException {
+        logger.debug("{} Committing the transfer transaction. peer={} 
clientChecksum={}", this, peer, clientChecksum);
+        HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        FlowFileTransaction transaction = 
transactionManager.finalizeTransaction(transactionId);
+        commSession.setChecksum(clientChecksum);
+        commSession.setStatus(Transaction.TransactionState.DATA_EXCHANGED);
+        return super.commitTransferTransaction(peer, transaction);
+    }
+
+    @Override
+    protected int commitReceiveTransaction(Peer peer, FlowFileTransaction 
transaction) throws IOException {
+        return holdTransaction(peer, transaction);
+    }
+
+    public int commitReceiveTransaction(Peer peer) throws IOException, 
IllegalStateException {
+        logger.debug("{} Committing the receive transaction. peer={}", this, 
peer);
+        HttpServerCommunicationsSession commSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        String transactionId = commSession.getTransactionId();
+        FlowFileTransaction transaction = 
transactionManager.finalizeTransaction(transactionId);
+        
commSession.setStatus(Transaction.TransactionState.TRANSACTION_CONFIRMED);
+        return super.commitReceiveTransaction(peer, transaction);
+    }
+
+    @Override
+    public RequestType getRequestType(final Peer peer) throws IOException {
+        return null;
+    }
+
+    @Override
+    public VersionNegotiator getVersionNegotiator() {
+        return versionNegotiator;
+    }
+
+    @Override
+    public void sendPeerList(final Peer peer) throws IOException {
+    }
+
+    @Override
+    public String getResourceName() {
+        return RESOURCE_NAME;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index ef7a61c..af6860b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -34,7 +34,9 @@ import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Reply via email to