http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 22ca29f..a2a7223 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -16,106 +16,50 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PortAuthorizationResult;
 import org.apache.nifi.remote.RemoteResourceFactory;
-import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
 import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.protocol.ServerProtocol;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SocketFlowFileServerProtocol implements ServerProtocol {
-
-    public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
 
-    private ProcessGroup rootGroup;
-    private String commsIdentifier;
-    private boolean handshakeCompleted;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
 
-    private Boolean useGzip;
-    private long requestExpirationMillis;
-    private RootGroupPort port;
-    private boolean shutdown = false;
-    private FlowFileCodec negotiatedFlowFileCodec = null;
-    private String transitUriPrefix = null;
+public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol {
 
-    private int requestedBatchCount = 0;
-    private long requestedBatchBytes = 0L;
-    private long requestedBatchNanos = 0L;
-    private static final long DEFAULT_BATCH_NANOS = 
TimeUnit.SECONDS.toNanos(5L);
+    public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
 
     private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
-    private final Logger logger = 
LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
 
     @Override
-    public void setRootProcessGroup(final ProcessGroup group) {
-        if (!group.isRootGroup()) {
-            throw new IllegalArgumentException();
-        }
-        this.rootGroup = group;
-    }
+    protected HandshakenProperties doHandshake(Peer peer) throws IOException, 
HandshakeException {
 
-    @Override
-    public void handshake(final Peer peer) throws IOException, 
HandshakeException {
-        if (handshakeCompleted) {
-            throw new IllegalStateException("Handshake has already been 
completed");
-        }
-        if (shutdown) {
-            throw new IllegalStateException("Protocol is shutdown");
-        }
+        HandshakenProperties confirmed = new HandshakenProperties();
 
-        logger.debug("{} Handshaking with {}", this, peer);
         final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
 
-        commsIdentifier = dis.readUTF();
+        confirmed.setCommsIdentifier(dis.readUTF());
 
         if (versionNegotiator.getVersion() >= 3) {
-            transitUriPrefix = dis.readUTF();
+            String transitUriPrefix = dis.readUTF();
             if (!transitUriPrefix.endsWith("/")) {
                 transitUriPrefix = transitUriPrefix + "/";
             }
+            confirmed.setTransitUriPrefix(transitUriPrefix);
         }
 
         final Map<String, String> properties = new HashMap<>();
@@ -128,128 +72,33 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
 
         // evaluate the properties received
         boolean responseWritten = false;
-        for (final Map.Entry<String, String> entry : properties.entrySet()) {
-            final String propertyName = entry.getKey();
-            final String value = entry.getValue();
 
-            final HandshakeProperty property;
-            try {
-                property = HandshakeProperty.valueOf(propertyName);
-            } catch (final Exception e) {
-                ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown 
Property Name: " + propertyName);
-                throw new HandshakeException("Received unknown property: " + 
propertyName);
+        try {
+            validateHandshakeRequest(confirmed, peer, properties);
+        } catch (HandshakeException e) {
+            ResponseCode handshakeResult = e.getResponseCode();
+            if(handshakeResult.containsMessage()){
+                handshakeResult.writeResponse(dos, e.getMessage());
+            } else {
+                handshakeResult.writeResponse(dos);
             }
-
-            try {
-                switch (property) {
-                    case GZIP: {
-                        useGzip = Boolean.parseBoolean(value);
-                        break;
-                    }
-                    case REQUEST_EXPIRATION_MILLIS:
-                        requestExpirationMillis = Long.parseLong(value);
-                        break;
-                    case BATCH_COUNT:
-                        requestedBatchCount = Integer.parseInt(value);
-                        if (requestedBatchCount < 0) {
-                            throw new HandshakeException("Cannot request Batch 
Count less than 1; requested value: " + value);
-                        }
-                        break;
-                    case BATCH_SIZE:
-                        requestedBatchBytes = Long.parseLong(value);
-                        if (requestedBatchBytes < 0) {
-                            throw new HandshakeException("Cannot request Batch 
Size less than 1; requested value: " + value);
-                        }
-                        break;
-                    case BATCH_DURATION:
-                        requestedBatchNanos = 
TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
-                        if (requestedBatchNanos < 0) {
-                            throw new HandshakeException("Cannot request Batch 
Duration less than 1; requested value: " + value);
-                        }
-                        break;
-                    case PORT_IDENTIFIER: {
-                        Port receivedPort = rootGroup.getInputPort(value);
-                        if (receivedPort == null) {
-                            receivedPort = rootGroup.getOutputPort(value);
-                        }
-                        if (receivedPort == null) {
-                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                            throw new HandshakeException("Received unknown 
port identifier: " + value);
-                        }
-                        if (!(receivedPort instanceof RootGroupPort)) {
-                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                            throw new HandshakeException("Received port 
identifier " + value + ", but this Port is not a RootGroupPort");
-                        }
-
-                        this.port = (RootGroupPort) receivedPort;
-                        final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
-                        if (!portAuthResult.isAuthorized()) {
-                            logger.debug("Responding with ResponseCode 
UNAUTHORIZED: ", portAuthResult.getExplanation());
-                            ResponseCode.UNAUTHORIZED.writeResponse(dos, 
portAuthResult.getExplanation());
-                            responseWritten = true;
-                            break;
-                        }
-
-                        if (!receivedPort.isValid()) {
-                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
-                            responseWritten = true;
-                            break;
-                        }
-
-                        if (!receivedPort.isRunning()) {
-                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
-                            responseWritten = true;
-                            break;
-                        }
-
-                        // PORTS_DESTINATION_FULL was introduced in version 2. 
If version 1, just ignore this
-                        // we we will simply not service the request but the 
sender will timeout
-                        if (getVersionNegotiator().getVersion() > 1) {
-                            for (final Connection connection : 
port.getConnections()) {
-                                if (connection.getFlowFileQueue().isFull()) {
-                                    logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", receivedPort);
-                                    
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
-                                    responseWritten = true;
-                                    break;
-                                }
-                            }
-                        }
-
-                        break;
-                    }
-                }
-            } catch (final NumberFormatException nfe) {
-                throw new HandshakeException("Received invalid value for 
property '" + property + "'; invalid value: " + value);
+            switch (handshakeResult) {
+                case UNAUTHORIZED:
+                case PORT_NOT_IN_VALID_STATE:
+                case PORTS_DESTINATION_FULL:
+                    responseWritten = true;
+                    break;
+                default:
+                    throw e;
             }
         }
 
-        if (useGzip == null) {
-            logger.debug("Responding with ResponseCode MISSING_PROPERTY 
because GZIP Property missing");
-            ResponseCode.MISSING_PROPERTY.writeResponse(dos, 
HandshakeProperty.GZIP.name());
-            throw new HandshakeException("Missing Property " + 
HandshakeProperty.GZIP.name());
-        }
-
         // send "OK" response
         if (!responseWritten) {
             ResponseCode.PROPERTIES_OK.writeResponse(dos);
         }
 
-        logger.debug("{} Finished handshake with {}", this, peer);
-        handshakeCompleted = true;
-    }
-
-    @Override
-    public boolean isHandshakeSuccessful() {
-        return handshakeCompleted;
-    }
-
-    @Override
-    public RootGroupPort getPort() {
-        return port;
+        return confirmed;
     }
 
     @Override
@@ -280,290 +129,6 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
         }
     }
 
-    @Override
-    public FlowFileCodec getPreNegotiatedCodec() {
-        return negotiatedFlowFileCodec;
-    }
-
-    @Override
-    public int transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        if (!handshakeCompleted) {
-            throw new IllegalStateException("Handshake has not been 
completed");
-        }
-        if (shutdown) {
-            throw new IllegalStateException("Protocol is shutdown");
-        }
-
-        logger.debug("{} Sending FlowFiles to {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String remoteDn = commsSession.getUserDn();
-        if (remoteDn == null) {
-            remoteDn = "none";
-        }
-
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            // we have no data to send. Notify the peer.
-            logger.debug("{} No data to send to {}", this, peer);
-            ResponseCode.NO_MORE_DATA.writeResponse(dos);
-            return 0;
-        }
-
-        // we have data to send.
-        logger.debug("{} Data is available to send to {}", this, peer);
-        ResponseCode.MORE_DATA.writeResponse(dos);
-
-        final StopWatch stopWatch = new StopWatch(true);
-        long bytesSent = 0L;
-        final Set<FlowFile> flowFilesSent = new HashSet<>();
-        final CRC32 crc = new CRC32();
-
-        // send data until we reach some batch size
-        boolean continueTransaction = true;
-        final long startNanos = System.nanoTime();
-        String calculatedCRC = "";
-        while (continueTransaction) {
-            final OutputStream flowFileOutputStream = useGzip ? new 
CompressionOutputStream(dos) : dos;
-            logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, 
peer});
-
-            final CheckedOutputStream checkedOutputStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
-
-            final StopWatch transferWatch = new StopWatch(true);
-
-            final FlowFile toSend = flowFile;
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    final DataPacket dataPacket = new 
StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
-                    codec.encode(dataPacket, checkedOutputStream);
-                }
-            });
-
-            final long transmissionMillis = 
transferWatch.getElapsed(TimeUnit.MILLISECONDS);
-
-            // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
-            // Otherwise, do NOT close it because we don't want to close the 
underlying stream
-            // (CompressionOutputStream will not close the underlying stream 
when it's closed)
-            if (useGzip) {
-                checkedOutputStream.close();
-            }
-
-            flowFilesSent.add(flowFile);
-            bytesSent += flowFile.getSize();
-
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
-            session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
-            session.remove(flowFile);
-
-            // determine if we should check for more data on queue.
-            final long sendingNanos = System.nanoTime() - startNanos;
-            boolean poll = true;
-            if (sendingNanos >= requestedBatchNanos && requestedBatchNanos > 
0L) {
-                poll = false;
-            }
-            if (bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L) {
-                poll = false;
-            }
-            if (flowFilesSent.size() >= requestedBatchCount && 
requestedBatchCount > 0) {
-                poll = false;
-            }
-
-            if (requestedBatchNanos == 0 && requestedBatchBytes == 0 && 
requestedBatchCount == 0) {
-                poll = (sendingNanos < DEFAULT_BATCH_NANOS);
-            }
-
-            if (poll) {
-                // we've not elapsed the requested sending duration, so get 
more data.
-                flowFile = session.get();
-            } else {
-                flowFile = null;
-            }
-
-            continueTransaction = (flowFile != null);
-            if (continueTransaction) {
-                logger.debug("{} Sending ContinueTransaction indicator to {}", 
this, peer);
-                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-            } else {
-                logger.debug("{} Sending FinishTransaction indicator to {}", 
this, peer);
-                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                calculatedCRC = 
String.valueOf(checkedOutputStream.getChecksum().getValue());
-            }
-        }
-
-        // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
-        final Response transactionConfirmationResponse = Response.read(dis);
-        if (transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION) {
-            // Confirm Checksum and echo back the confirmation.
-            logger.debug("{} Received {}  from {}", this, 
transactionConfirmationResponse, peer);
-            final String receivedCRC = 
transactionConfirmationResponse.getMessage();
-
-            if (versionNegotiator.getVersion() > 3) {
-                if (!receivedCRC.equals(calculatedCRC)) {
-                    ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                    session.rollback();
-                    throw new IOException(this + " Sent data to peer " + peer 
+ " but calculated CRC32 Checksum as "
-                            + calculatedCRC + " while peer calculated CRC32 
Checksum as " + receivedCRC
-                            + "; canceling transaction and rolling back 
session");
-                }
-            }
-
-            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
-        } else {
-            throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
-        }
-
-        final String flowFileDescription = flowFilesSent.size() < 20 ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-
-        final Response transactionResponse;
-        try {
-            transactionResponse = Response.read(dis);
-        } catch (final IOException e) {
-            logger.error("{} Failed to receive a response from {} when 
expecting a TransactionFinished Indicator."
-                    + " It is unknown whether or not the peer successfully 
received/processed the data."
-                    + " Therefore, {} will be rolled back, possibly resulting 
in data duplication of {}",
-                    this, peer, session, flowFileDescription);
-            session.rollback();
-            throw e;
-        }
-
-        logger.debug("{} received {} from {}", new Object[]{this, 
transactionResponse, peer});
-        if (transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
-            peer.penalize(port.getIdentifier(), 
port.getYieldPeriod(TimeUnit.MILLISECONDS));
-        } else if (transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED) {
-            throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
-        }
-
-        session.commit();
-
-        stopWatch.stop();
-        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesSent);
-        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at 
a rate of {}", new Object[]{
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-
-        return flowFilesSent.size();
-    }
-
-    @Override
-    public int receiveFlowFiles(final Peer peer, final ProcessContext context, 
final ProcessSession session, final FlowFileCodec codec) throws IOException, 
ProtocolException {
-        if (!handshakeCompleted) {
-            throw new IllegalStateException("Handshake has not been 
completed");
-        }
-        if (shutdown) {
-            throw new IllegalStateException("Protocol is shutdown");
-        }
-
-        logger.debug("{} receiving FlowFiles from {}", this, peer);
-
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String remoteDn = commsSession.getUserDn();
-        if (remoteDn == null) {
-            remoteDn = "none";
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-        final CRC32 crc = new CRC32();
-
-        // Peer has data. Otherwise, we would not have been called, because 
they would not have sent
-        // a SEND_FLOWFILES request to use. Just decode the bytes into 
FlowFiles until peer says he's
-        // finished sending data.
-        final Set<FlowFile> flowFilesReceived = new HashSet<>();
-        long bytesReceived = 0L;
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        while (continueTransaction) {
-            final long startNanos = System.nanoTime();
-            final InputStream flowFileInputStream = useGzip ? new 
CompressionInputStream(dis) : dis;
-            final CheckedInputStream checkedInputStream = new 
CheckedInputStream(flowFileInputStream, crc);
-
-            final DataPacket dataPacket = codec.decode(checkedInputStream);
-            FlowFile flowFile = session.create();
-            flowFile = session.importFrom(dataPacket.getData(), flowFile);
-            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
-
-            final long transferNanos = System.nanoTime() - startNanos;
-            final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-            final String sourceSystemFlowFileUuid = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, 
CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
-            session.getProvenanceReporter().receive(flowFile, transitUri, 
sourceSystemFlowFileUuid == null
-                    ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
-            session.transfer(flowFile, Relationship.ANONYMOUS);
-            flowFilesReceived.add(flowFile);
-            bytesReceived += flowFile.getSize();
-
-            final Response transactionResponse = Response.read(dis);
-            switch (transactionResponse.getCode()) {
-                case CONTINUE_TRANSACTION:
-                    logger.debug("{} Received ContinueTransaction indicator 
from {}", this, peer);
-                    break;
-                case FINISH_TRANSACTION:
-                    logger.debug("{} Received FinishTransaction indicator from 
{}", this, peer);
-                    continueTransaction = false;
-                    calculatedCRC = 
String.valueOf(checkedInputStream.getChecksum().getValue());
-                    break;
-                case CANCEL_TRANSACTION:
-                    logger.info("{} Received CancelTransaction indicator from 
{} with explanation {}", this, peer, transactionResponse.getMessage());
-                    session.rollback();
-                    return 0;
-                default:
-                    throw new ProtocolException("Received unexpected response 
from peer: when expecting Continue Transaction or Finish Transaction, received" 
+ transactionResponse);
-            }
-        }
-
-        // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-        // to peer so that we can verify that the connection is still open. 
This is a two-phase commit,
-        // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
-        // session and then when we send the response back to the peer, the 
peer may have timed out and may not
-        // be listening. As a result, it will re-send the data. By doing this 
two-phase commit, we narrow the
-        // Critical Section involved in this transaction so that rather than 
the Critical Section being the
-        // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
-        logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
-        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-
-        final Response confirmTransactionResponse = Response.read(dis);
-        logger.debug("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
-
-        switch (confirmTransactionResponse.getCode()) {
-            case CONFIRM_TRANSACTION:
-                break;
-            case BAD_CHECKSUM:
-                session.rollback();
-                throw new IOException(this + " Received a BadChecksum response 
from peer " + peer);
-            default:
-                throw new ProtocolException(this + " Received unexpected 
Response Code from peer " + peer + " : " + confirmTransactionResponse + "; 
expected 'Confirm Transaction' Response Code");
-        }
-
-        // Commit the session so that we have persisted the data
-        session.commit();
-
-        if (context.getAvailableRelationships().isEmpty()) {
-            // Confirm that we received the data and the peer can now discard 
it but that the peer should not
-            // send any more data for a bit
-            logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);
-            
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
-        } else {
-            // Confirm that we received the data and the peer can now discard 
it
-            logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
-            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-        }
-
-        stopWatch.stop();
-        final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-        final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-        logger.info("{} Successfully received {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[]{
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-
-        return flowFilesReceived.size();
-    }
 
     @Override
     public RequestType getRequestType(final Peer peer) throws IOException {
@@ -582,22 +147,6 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
     }
 
     @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-
-    @Override
-    public void shutdown(final Peer peer) {
-        logger.debug("{} Shutting down with {}", this, peer);
-        shutdown = true;
-    }
-
-    @Override
-    public boolean isShutdown() {
-        return shutdown;
-    }
-
-    @Override
     public void sendPeerList(final Peer peer) throws IOException {
         if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been 
completed");
@@ -632,17 +181,9 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
         return RESOURCE_NAME;
     }
 
-    @Override
-    public void setNodeInformant(final NodeInformant nodeInformant) {
-    }
 
     @Override
-    public long getRequestExpiration() {
-        return requestExpirationMillis;
-    }
-
-    @Override
-    public String toString() {
-        return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]";
+    public VersionNegotiator getVersionNegotiator() {
+        return versionNegotiator;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
new file mode 100644
index 0000000..ded2042
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHttpRemoteSiteListener {
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+    }
+
+    @Test
+    public void testNormalTransactionProgress() {
+        HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
+        String transactionId = transactionManager.createTransaction();
+
+        assertTrue("Transaction should be active.", 
transactionManager.isTransactionActive(transactionId));
+
+        ProcessSession processSession = Mockito.mock(ProcessSession.class);
+        FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
+        transactionManager.holdTransaction(transactionId, transaction);
+
+        transaction = transactionManager.finalizeTransaction(transactionId);
+        assertNotNull(transaction);
+
+        assertFalse("Transaction should not be active anymore.", 
transactionManager.isTransactionActive(transactionId));
+
+    }
+
+    @Test
+    public void testDuplicatedTransactionId() {
+        HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
+        String transactionId = transactionManager.createTransaction();
+
+        assertTrue("Transaction should be active.", 
transactionManager.isTransactionActive(transactionId));
+
+        ProcessSession processSession = Mockito.mock(ProcessSession.class);
+        FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
+        transactionManager.holdTransaction(transactionId, transaction);
+
+        try {
+            transactionManager.holdTransaction(transactionId, transaction);
+            fail("The same transaction id can't hold another transaction");
+        } catch (IllegalStateException e) {
+        }
+
+    }
+
+    @Test
+    public void testNoneExistingTransaction() {
+        HttpRemoteSiteListener transactionManager = 
HttpRemoteSiteListener.getInstance();
+
+        String transactionId = "does-not-exist-1";
+        assertFalse("Transaction should not be active.", 
transactionManager.isTransactionActive(transactionId));
+
+        ProcessSession processSession = Mockito.mock(ProcessSession.class);
+        FlowFileTransaction transaction = new 
FlowFileTransaction(processSession, null, null, 0, null, null);
+        try {
+            transactionManager.holdTransaction(transactionId, transaction);
+        } catch (IllegalStateException e) {
+            fail("Transaction can be held even if the transaction id is not 
valid anymore," +
+                    " in order to support large file or slow network.");
+        }
+
+        transactionId = "does-not-exist-2";
+        try {
+            transactionManager.finalizeTransaction(transactionId);
+            fail("But transaction should not be finalized if it isn't 
active.");
+        } catch (IllegalStateException e) {
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
new file mode 100644
index 0000000..a8900c9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.PortAuthorizationResult;
+import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.io.http.HttpInput;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestHttpFlowFileServerProtocol {
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+    }
+
+    private Peer getDefaultPeer() {
+        return getDefaultPeer(null);
+    }
+
+    private Peer getDefaultPeer(final String transactionId) {
+        final PeerDescription description = new PeerDescription("peer-host", 
8080, false);
+        final InputStream inputStream = new ByteArrayInputStream(new byte[]{});
+        final OutputStream outputStream = new ByteArrayOutputStream();
+        final HttpServerCommunicationsSession commsSession = new 
HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+        commsSession.putHandshakeParam(HandshakeProperty.GZIP, "false");
+        
commsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, 
"1234");
+        final String peerUrl = "http://peer-host:8080/";;
+        final String clusterUrl = "cluster-url";
+        return new Peer(description, commsSession, peerUrl, clusterUrl);
+    }
+
+    private HttpFlowFileServerProtocol getDefaultHttpFlowFileServerProtocol() {
+        final StandardVersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
+        return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+    }
+
+    @Test
+    public void testIllegalHandshakeProperty() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+        final Peer peer = getDefaultPeer();
+        
((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getHandshakeParams().clear();
+        try {
+            serverProtocol.handshake(peer);
+            fail();
+        } catch (HandshakeException e) {
+            assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode());
+        }
+
+        assertFalse(serverProtocol.isHandshakeSuccessful());
+    }
+
+    @Test
+    public void testUnknownPort() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+        final Peer peer = getDefaultPeer();
+        ((HttpServerCommunicationsSession)peer.getCommunicationsSession())
+                .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
"port-identifier");
+
+        final ProcessGroup processGroup = mock(ProcessGroup.class);
+        doReturn(true).when(processGroup).isRootGroup();
+
+        serverProtocol.setRootProcessGroup(processGroup);
+        try {
+            serverProtocol.handshake(peer);
+            fail();
+        } catch (HandshakeException e) {
+            assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode());
+        }
+
+        assertFalse(serverProtocol.isHandshakeSuccessful());
+    }
+
+    @Test
+    public void testUnauthorized() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+        final Peer peer = getDefaultPeer();
+        ((HttpServerCommunicationsSession)peer.getCommunicationsSession())
+                .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
"port-identifier");
+
+        final ProcessGroup processGroup = mock(ProcessGroup.class);
+        final RootGroupPort port = mock(RootGroupPort.class);
+        final PortAuthorizationResult authResult = 
mock(PortAuthorizationResult.class);
+        doReturn(true).when(processGroup).isRootGroup();
+        doReturn(port).when(processGroup).getOutputPort("port-identifier");
+        
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
+
+        serverProtocol.setRootProcessGroup(processGroup);
+        try {
+            serverProtocol.handshake(peer);
+            fail();
+        } catch (HandshakeException e) {
+            assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode());
+        }
+
+        assertFalse(serverProtocol.isHandshakeSuccessful());
+    }
+
+    @Test
+    public void testPortNotInValidState() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+        final Peer peer = getDefaultPeer();
+        ((HttpServerCommunicationsSession)peer.getCommunicationsSession())
+                .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
"port-identifier");
+
+        final ProcessGroup processGroup = mock(ProcessGroup.class);
+        final RootGroupPort port = mock(RootGroupPort.class);
+        final PortAuthorizationResult authResult = 
mock(PortAuthorizationResult.class);
+        doReturn(true).when(processGroup).isRootGroup();
+        doReturn(port).when(processGroup).getOutputPort("port-identifier");
+        
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
+        doReturn(true).when(authResult).isAuthorized();
+
+        serverProtocol.setRootProcessGroup(processGroup);
+        try {
+            serverProtocol.handshake(peer);
+            fail();
+        } catch (HandshakeException e) {
+            assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, 
e.getResponseCode());
+        }
+
+        assertFalse(serverProtocol.isHandshakeSuccessful());
+    }
+
+    @Test
+    public void testPortDestinationFull() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+        final Peer peer = getDefaultPeer();
+        ((HttpServerCommunicationsSession)peer.getCommunicationsSession())
+                .putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, 
"port-identifier");
+
+        final ProcessGroup processGroup = mock(ProcessGroup.class);
+        final RootGroupPort port = mock(RootGroupPort.class);
+        final PortAuthorizationResult authResult = 
mock(PortAuthorizationResult.class);
+        doReturn(true).when(processGroup).isRootGroup();
+        doReturn(port).when(processGroup).getOutputPort("port-identifier");
+        
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
+        doReturn(true).when(authResult).isAuthorized();
+        doReturn(true).when(port).isValid();
+        doReturn(true).when(port).isRunning();
+        Set<Connection> connections = new HashSet<>();
+        final Connection connection = mock(Connection.class);
+        connections.add(connection);
+        doReturn(connections).when(port).getConnections();
+        final FlowFileQueue flowFileQueue = mock(FlowFileQueue.class);
+        doReturn(flowFileQueue).when(connection).getFlowFileQueue();
+        doReturn(true).when(flowFileQueue).isFull();
+
+        serverProtocol.setRootProcessGroup(processGroup);
+        try {
+            serverProtocol.handshake(peer);
+            fail();
+        } catch (HandshakeException e) {
+            assertEquals(ResponseCode.PORTS_DESTINATION_FULL, 
e.getResponseCode());
+        }
+
+        assertFalse(serverProtocol.isHandshakeSuccessful());
+    }
+
+    @Test
+    public void testShutdown() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final Peer peer = getDefaultPeer();
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        assertTrue(negotiatedCoded instanceof StandardFlowFileCodec);
+
+        assertEquals(negotiatedCoded, serverProtocol.getPreNegotiatedCodec());
+        assertEquals(1234, serverProtocol.getRequestExpiration());
+
+        serverProtocol.shutdown(peer);
+
+        final ProcessContext context = null;
+        final ProcessSession processSession = null;
+        try {
+            serverProtocol.transferFlowFiles(peer, context, processSession, 
negotiatedCoded);
+            fail("transferFlowFiles should fail since it's already shutdown.");
+        } catch (IllegalStateException e) {
+        }
+
+        try {
+            serverProtocol.receiveFlowFiles(peer, context, processSession, 
negotiatedCoded);
+            fail("receiveFlowFiles should fail since it's already shutdown.");
+        } catch (IllegalStateException e) {
+        }
+    }
+
+    @Test
+    public void testTransferZeroFile() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final Peer peer = getDefaultPeer();
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = null;
+        final ProcessSession processSession = mock(ProcessSession.class);
+
+        // Execute test using mock
+        final int flowFileSent = serverProtocol.transferFlowFiles(peer, 
context, processSession, negotiatedCoded);
+        assertEquals(0, flowFileSent);
+    }
+
+    @Test
+    public void testTransferOneFile() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testTransferOneFile";
+        final Peer peer = transferOneFile(serverProtocol, transactionId);
+
+        // Commit transaction
+        final int flowFileSent = 
serverProtocol.commitTransferTransaction(peer, "2077607535");
+        assertEquals(1, flowFileSent);
+    }
+
+    @Test
+    public void testTransferOneFileBadChecksum() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testTransferOneFileBadChecksum";
+        final Peer peer = transferOneFile(serverProtocol, transactionId);
+
+        // Commit transaction
+        try {
+            serverProtocol.commitTransferTransaction(peer, 
"client-sent-wrong-checksum");
+            fail();
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("CRC32 Checksum"));
+        }
+    }
+
+    private Peer transferOneFile(HttpFlowFileServerProtocol serverProtocol, 
String transactionId) throws IOException {
+        final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
+        commsSession.setUserDn("unit-test");
+
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = mock(ProcessContext.class);
+        final ProcessSession processSession = mock(ProcessSession.class);
+        final ProvenanceReporter provenanceReporter = 
mock(ProvenanceReporter.class);
+        final FlowFile flowFile = mock(FlowFile.class);
+        doReturn(flowFile).when(processSession).get();
+        
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+        doAnswer(invocation -> {
+            String peerUrl = (String)invocation.getArguments()[1];
+            String detail = (String)invocation.getArguments()[2];
+            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
+            return null;
+        }).when(provenanceReporter).send(eq(flowFile), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
+
+        doAnswer(invocation -> {
+            InputStreamCallback callback = 
(InputStreamCallback)invocation.getArguments()[1];
+            callback.process(new java.io.ByteArrayInputStream("Server 
content".getBytes()));
+            return null;
+        }).when(processSession).read(any(FlowFile.class), 
any(InputStreamCallback.class));
+
+        // Execute test using mock
+        int flowFileSent = serverProtocol.transferFlowFiles(peer, context, 
processSession, negotiatedCoded);
+        assertEquals(1, flowFileSent);
+
+        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+        return peer;
+    }
+
+    @Test
+    public void testTransferTwoFiles() throws Exception {
+        final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testTransferTwoFiles";
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
+        commsSession.setUserDn("unit-test");
+
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = mock(ProcessContext.class);
+        final ProcessSession processSession = mock(ProcessSession.class);
+        final ProvenanceReporter provenanceReporter = 
mock(ProvenanceReporter.class);
+        final FlowFile flowFile1 = mock(FlowFile.class);
+        final FlowFile flowFile2 = mock(FlowFile.class);
+        doReturn(flowFile1)
+                .doReturn(flowFile2)
+                .when(processSession).get();
+
+        
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+        doAnswer(invocation -> {
+            String peerUrl = (String)invocation.getArguments()[1];
+            String detail = (String)invocation.getArguments()[2];
+            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
+            return null;
+        }).when(provenanceReporter).send(eq(flowFile1), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
+
+        
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+        doAnswer(invocation -> {
+            String peerUrl = (String)invocation.getArguments()[1];
+            String detail = (String)invocation.getArguments()[2];
+            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
+            return null;
+        }).when(provenanceReporter).send(eq(flowFile2), any(String.class), 
any(String.class), any(Long.class), any(Boolean.class));
+
+        doAnswer(invocation -> {
+            InputStreamCallback callback = 
(InputStreamCallback)invocation.getArguments()[1];
+            callback.process(new java.io.ByteArrayInputStream("Server 
content".getBytes()));
+            return null;
+        }).when(processSession).read(any(FlowFile.class), 
any(InputStreamCallback.class));
+
+        // Execute test using mock
+        int flowFileSent = serverProtocol.transferFlowFiles(peer, context, 
processSession, negotiatedCoded);
+        assertEquals(2, flowFileSent);
+
+        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+
+        // Commit transaction
+        flowFileSent = serverProtocol.commitTransferTransaction(peer, 
"2747386400");
+        assertEquals(2, flowFileSent);
+    }
+
+    private DataPacket createClientDataPacket() {
+        final String contents = "Content from client.";
+        final byte[] bytes = contents.getBytes();
+        final InputStream in = new ByteArrayInputStream(bytes);
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("client-attr-1", "client-attr-1-value");
+        attributes.put("client-attr-2", "client-attr-2-value");
+        return new StandardDataPacket(attributes, in, bytes.length);
+    }
+
+    @Test
+    public void testReceiveZeroFile() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final Peer peer = getDefaultPeer("testReceiveZeroFile");
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        commsSession.setUserDn("unit-test");
+
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = null;
+        final ProcessSession processSession = mock(ProcessSession.class);
+
+
+        final InputStream httpInputStream = new ByteArrayInputStream(new 
byte[]{});
+
+        ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
+
+        // Execute test using mock
+        final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, 
context, processSession, negotiatedCoded);
+        assertEquals(0, flowFileReceived);
+    }
+
+    @Test
+    public void testReceiveOneFile() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testReceiveOneFile";
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        receiveOneFile(serverProtocol, transactionId, peer);
+
+        // Commit transaction
+        commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
+        final int flowFileReceived = 
serverProtocol.commitReceiveTransaction(peer);
+        assertEquals(1, flowFileReceived);
+    }
+
+    @Test
+    public void testReceiveOneFileBadChecksum() throws Exception {
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testReceiveOneFileBadChecksum";
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        receiveOneFile(serverProtocol, transactionId, peer);
+
+        // Commit transaction
+        commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM);
+        try {
+            serverProtocol.commitReceiveTransaction(peer);
+            fail();
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("Received a BadChecksum 
response"));
+        }
+    }
+
+    private void receiveOneFile(HttpFlowFileServerProtocol serverProtocol, 
String transactionId, Peer peer) throws IOException {
+        final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
+        commsSession.setUserDn("unit-test");
+
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = mock(ProcessContext.class);
+        final ProcessSession processSession = mock(ProcessSession.class);
+        final ProvenanceReporter provenanceReporter = 
mock(ProvenanceReporter.class);
+        final FlowFile flowFile = mock(FlowFile.class);
+
+        DataPacket dataPacket = createClientDataPacket();
+
+        final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
+        negotiatedCoded.encode(dataPacket, testDataOs);
+        final InputStream httpInputStream = new 
ByteArrayInputStream(testDataOs.toByteArray());
+
+        ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
+
+        doAnswer(invocation -> {
+            InputStream is = (InputStream) invocation.getArguments()[0];
+            for (int b; (b = is.read()) >= 0;) {
+                // consume stream.
+            }
+            return flowFile;
+        }).when(processSession).importFrom(any(InputStream.class), 
any(FlowFile.class));
+        // AbstractFlowFileServerProtocol adopts builder pattern and 
putAttribute is the last execution
+        // which returns flowFile instance used later.
+        
doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), 
any(String.class), any(String.class));
+        
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+        doAnswer(invocation -> {
+            String peerUrl = (String)invocation.getArguments()[1];
+            String detail = (String)invocation.getArguments()[3];
+            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
+            return null;
+        }).when(provenanceReporter)
+                .receive(any(FlowFile.class), any(String.class), 
any(String.class), any(String.class), any(Long.class));
+
+        Set<Relationship> relations = new HashSet<>();
+        final Relationship relationship = new Relationship.Builder().build();
+        relations.add(relationship);
+        doReturn(relations).when(context).getAvailableRelationships();
+
+        // Execute test using mock
+        int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, 
processSession, negotiatedCoded);
+        assertEquals(1, flowFileReceived);
+
+        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+    }
+
+    @Test
+    public void testReceiveTwoFiles() throws Exception {
+        final HttpRemoteSiteListener remoteSiteListener = 
HttpRemoteSiteListener.getInstance();
+        final HttpFlowFileServerProtocol serverProtocol = 
getDefaultHttpFlowFileServerProtocol();
+
+        final String transactionId = "testReceiveTwoFile";
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = 
(HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "2");
+        commsSession.setUserDn("unit-test");
+
+        serverProtocol.handshake(peer);
+
+        assertTrue(serverProtocol.isHandshakeSuccessful());
+
+        final FlowFileCodec negotiatedCoded = 
serverProtocol.negotiateCodec(peer);
+        final ProcessContext context = mock(ProcessContext.class);
+        final ProcessSession processSession = mock(ProcessSession.class);
+        final ProvenanceReporter provenanceReporter = 
mock(ProvenanceReporter.class);
+        final FlowFile flowFile1 = mock(FlowFile.class);
+        final FlowFile flowFile2 = mock(FlowFile.class);
+
+        final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
+        negotiatedCoded.encode(createClientDataPacket(), testDataOs);
+        negotiatedCoded.encode(createClientDataPacket(), testDataOs);
+        final InputStream httpInputStream = new 
ByteArrayInputStream(testDataOs.toByteArray());
+
+        ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
+
+        doAnswer(invocation -> {
+            InputStream is = (InputStream) invocation.getArguments()[0];
+            for (int b; (b = is.read()) >= 0;) {
+                // consume stream.
+            }
+            return flowFile1;
+        }).when(processSession).importFrom(any(InputStream.class), 
any(FlowFile.class));
+        // AbstractFlowFileServerProtocol adopts builder pattern and 
putAttribute is the last execution
+        // which returns flowFile instance used later.
+        doReturn(flowFile1)
+                .doReturn(flowFile2)
+                .when(processSession).putAttribute(any(FlowFile.class), 
any(String.class), any(String.class));
+        
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+        doAnswer(invocation -> {
+            String peerUrl = (String)invocation.getArguments()[1];
+            String detail = (String)invocation.getArguments()[3];
+            assertEquals("http://peer-host:8080/";, peerUrl);
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
+            return null;
+        }).when(provenanceReporter)
+                .receive(any(FlowFile.class), any(String.class), 
any(String.class), any(String.class), any(Long.class));
+
+        Set<Relationship> relations = new HashSet<>();
+        doReturn(relations).when(context).getAvailableRelationships();
+
+        // Execute test using mock
+        int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, 
processSession, negotiatedCoded);
+        assertEquals(2, flowFileReceived);
+
+        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+
+        // Commit transaction
+        commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
+        flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
+        assertEquals(2, flowFileReceived);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 161be64..864c06e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -2338,9 +2338,11 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         // determine the site to site configuration
         if (isClustered()) {
             
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningPort());
+            
controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getClusterManagerRemoteSiteListeningHttpPort());
             
controllerDTO.setSiteToSiteSecure(controllerFacade.isClusterManagerRemoteSiteCommsSecure());
         } else {
             
controllerDTO.setRemoteSiteListeningPort(controllerFacade.getRemoteSiteListeningPort());
+            
controllerDTO.setRemoteSiteHttpListeningPort(controllerFacade.getRemoteSiteListeningHttpPort());
             
controllerDTO.setSiteToSiteSecure(controllerFacade.isRemoteSiteCommsSecure());
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 2138264..4dee472 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -103,7 +103,7 @@ public abstract class ApplicationResource {
     @Context
     private HttpContext httpContext;
 
-    private NiFiProperties properties;
+    protected NiFiProperties properties;
     private RequestReplicator requestReplicator;
     private ClusterCoordinator clusterCoordinator;
 

Reply via email to