Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client 2aaed7021 -> 77fd8e5ec


NIFI-282: Refactoring to extract client util


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/77fd8e5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/77fd8e5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/77fd8e5e

Branch: refs/heads/site-to-site-client
Commit: 77fd8e5ec7e86095d6235deae91939db6412eeb1
Parents: 2aaed70
Author: Mark Payne <[email protected]>
Authored: Wed Jan 21 20:04:36 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Wed Jan 21 20:04:36 2015 -0500

----------------------------------------------------------------------
 .../nifi/stream/io/LimitingInputStream.java     | 111 ++++
 .../stream/io/MinimumLengthInputStream.java     |  93 +++
 .../org/apache/nifi/remote/Transaction.java     |  44 ++
 .../nifi/remote/client/SiteToSiteClient.java    |   7 +-
 .../nifi/remote/client/socket/SocketClient.java |  54 +-
 .../remote/codec/StandardFlowFileCodec.java     |  78 +--
 .../remote/exception/ProtocolException.java     |   4 +-
 .../nifi/remote/protocol/ClientProtocol.java    |  13 +-
 .../protocol/socket/SocketClientProtocol.java   | 618 +++++--------------
 .../socket/SocketClientTransaction.java         | 260 +++++++-
 .../nifi/remote/util/StandardDataPacket.java    |  50 ++
 .../nifi/remote/StandardRemoteGroupPort.java    |   2 +-
 .../socket/SocketFlowFileServerProtocol.java    |  20 +-
 13 files changed, 758 insertions(+), 596 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
 
b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ 
b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+
+    public long getLimit() {
+       return limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
 
b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
new file mode 100644
index 0000000..2e93599
--- /dev/null
+++ 
b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that will throw EOFException if the underlying InputStream 
runs out of data before reaching the
+ * configured minimum amount of data
+ */
+public class MinimumLengthInputStream extends FilterInputStream {
+
+       private final long minLength;
+       private long consumedCount = 0L;
+       
+       public MinimumLengthInputStream(final InputStream in, final long 
minLength) {
+               super(in);
+               this.minLength = minLength;
+       }
+
+       
+       @Override
+       public int read() throws IOException {
+               final int b = super.read();
+               if ( b < 0 && consumedCount < minLength ) {
+                       throw new EOFException();
+               }
+               
+               if ( b >= 0 ) {
+                       consumedCount++;
+               }
+               
+               return b;
+       }
+       
+       @Override
+       public int read(byte[] b) throws IOException {
+               return read(b, 0, b.length);
+       }
+       
+       public int read(byte[] b, int off, int len) throws IOException {
+               final int num = super.read(b, off, len);
+               
+               if ( num < 0 && consumedCount < minLength ) {
+                       throw new EOFException();
+               }
+               
+               if ( num >= 0 ) {
+                       consumedCount += num;
+               }
+
+               return num;
+       }
+       
+       @Override
+       public long skip(final long n) throws IOException {
+               long skipped = super.skip(n);
+               if ( skipped < 1 ) {
+                       final int b = super.read();
+                       if ( b >= 0 ) {
+                               skipped = 1;
+                       }
+               }
+               
+               if ( skipped < 0 && consumedCount < minLength ) {
+                       throw new EOFException();
+               }
+               
+               if ( skipped >= 0 ) {
+                       consumedCount += skipped;
+               }
+               
+               return skipped;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..6c136fc
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface Transaction {
+
+       void confirm() throws IOException;
+       
+       void complete(boolean applyBackpressure) throws IOException;
+       
+       void cancel() throws IOException;
+       
+       void send(DataPacket dataPacket) throws IOException;
+       
+       DataPacket receive() throws IOException;
+       
+       TransactionState getState() throws IOException;
+       
+       public enum TransactionState {
+               TRANSACTION_STARTED,
+               DATA_EXCHANGED,
+               TRANSACTION_CONFIRMED,
+               TRANSACTION_COMPLETED,
+               TRANSACTION_CANCELED;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 34cb56a..164a63c 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -19,12 +19,11 @@ package org.apache.nifi.remote.client;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
 
 public interface SiteToSiteClient extends Closeable {
 
-       void send(DataPacket dataPacket) throws IOException;
-       
-       DataPacket receive() throws IOException;
+       Transaction createTransaction(TransferDirection direction) throws 
IOException;
        
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index b81b425..88eb5e8 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -24,6 +24,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -65,7 +66,7 @@ public class SocketClient implements SiteToSiteClient {
        
        
        @Override
-       public void send(final DataPacket dataPacket) throws IOException {
+       public Transaction createTransaction(final TransferDirection direction) 
throws IOException {
                final String portId = getPortIdentifier(TransferDirection.SEND);
                
                if ( portId == null ) {
@@ -91,19 +92,58 @@ public class SocketClient implements SiteToSiteClient {
                
                final EndpointConnectionState connectionState;
                try {
-                       connectionState = 
pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+                       connectionState = 
pool.getEndpointConnectionState(remoteDestination, direction);
                } catch (final ProtocolException | HandshakeException | 
PortNotRunningException | UnknownPortException e) {
                        throw new IOException(e);
                }
                
+               final Transaction transaction = 
connectionState.getSocketClientProtocol().startTransaction(
+                               connectionState.getPeer(), 
connectionState.getCodec(), direction);
                
-       }
+               // Wrap the transaction in a new one that will return the 
EndpointConnectionState back to the pool whenever
+               // the transaction is either completed or canceled.
+               return new Transaction() {
+                       @Override
+                       public void confirm() throws IOException {
+                               transaction.confirm();
+                       }
 
-       @Override
-       public DataPacket receive() throws IOException {
-               // TODO Auto-generated method stub
-               return null;
+                       @Override
+                       public void complete(final boolean applyBackpressure) 
throws IOException {
+                               try {
+                                       transaction.complete(applyBackpressure);
+                               } finally {
+                                       pool.offer(connectionState);
+                               }
+                       }
+
+                       @Override
+                       public void cancel() throws IOException {
+                               try {
+                                       transaction.cancel();
+                               } finally {
+                                       pool.offer(connectionState);
+                               }
+                       }
+
+                       @Override
+                       public void send(final DataPacket dataPacket) throws 
IOException {
+                               transaction.send(dataPacket);
+                       }
+
+                       @Override
+                       public DataPacket receive() throws IOException {
+                               return transaction.receive();
+                       }
+
+                       @Override
+                       public TransactionState getState() throws IOException {
+                               return transaction.getState();
+                       }
+                       
+               };
        }
+
        
        @Override
        public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index d18a4ee..6fd92de 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -26,14 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
 
 public class StandardFlowFileCodec implements FlowFileCodec {
        public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec 
{
     }
     
     @Override
-    public FlowFile encode(final FlowFile flowFile, final ProcessSession 
session, final OutputStream encodedOut) throws IOException {
+    public void encode(final DataPacket dataPacket, final OutputStream 
encodedOut) throws IOException {
         final DataOutputStream out = new DataOutputStream(encodedOut);
         
-        final Map<String, String> attributes = flowFile.getAttributes();
+        final Map<String, String> attributes = dataPacket.getAttributes();
         out.writeInt(attributes.size());
         for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
             writeString(entry.getKey(), out);
             writeString(entry.getValue(), out);
         }
         
-        out.writeLong(flowFile.getSize());
-        
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                final byte[] buffer = new byte[8192];
-                int len;
-                while ( (len = in.read(buffer)) > 0 ) {
-                    encodedOut.write(buffer, 0, len);
-                }
-                
-                encodedOut.flush();
-            }
-        });
+        out.writeLong(dataPacket.getSize());
         
-        return flowFile;
+        final InputStream in = dataPacket.getData();
+        StreamUtils.copy(in, encodedOut);
+        encodedOut.flush();
     }
 
     
     @Override
-    public FlowFile decode(final InputStream stream, final ProcessSession 
session) throws IOException, ProtocolException {
+    public DataPacket decode(final InputStream stream) throws IOException, 
ProtocolException {
         final DataInputStream in = new DataInputStream(stream);
         
         final int numAttributes;
@@ -94,43 +81,16 @@ public class StandardFlowFileCodec implements FlowFileCodec 
{
                throw new ProtocolException("FlowFile exceeds maximum number of 
attributes with a total of " + numAttributes);
         }
         
-        try {
-            final Map<String, String> attributes = new 
HashMap<>(numAttributes);
-            for (int i=0; i < numAttributes; i++) {
-                final String attrName = readString(in);
-                final String attrValue = readString(in);
-                attributes.put(attrName, attrValue);
-            }
-            
-            final long numBytes = in.readLong();
-            
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    int len;
-                    long size = 0;
-                    final byte[] buffer = new byte[8192];
-                    
-                    while ( size < numBytes && (len = in.read(buffer, 0, (int) 
Math.min(buffer.length, numBytes - size))) > 0 ) {
-                        out.write(buffer, 0, len);
-                        size += len;
-                    }
-
-                    if ( size != numBytes ) {
-                        throw new EOFException("Expected " + numBytes + " 
bytes but received only " + size);
-                    }
-                }
-            });
-
-            return flowFile;
-        } catch (final EOFException e) {
-               session.rollback();
-               
-            // we throw the general IOException here because we did not expect 
to hit EOFException
-            throw e;
+        final Map<String, String> attributes = new HashMap<>(numAttributes);
+        for (int i=0; i < numAttributes; i++) {
+            final String attrName = readString(in);
+            final String attrValue = readString(in);
+            attributes.put(attrName, attrValue);
         }
+        
+        final long numBytes = in.readLong();
+        
+        return new StandardDataPacket(attributes, stream, numBytes);
     }
 
     private void writeString(final String val, final DataOutputStream out) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index 0f50b98..e12348a 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.remote.exception;
 
-public class ProtocolException extends Exception {
+import java.io.IOException;
+
+public class ProtocolException extends IOException {
 
     private static final long serialVersionUID = 5763900324505818495L;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 51d3970..befbdaa 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -50,17 +51,7 @@ public interface ClientProtocol extends 
VersionedRemoteResource {
     
     
     
-    void startTransaction(Peer peer, TransferDirection direction) throws 
IOException, ProtocolException;
-    
-    void completeTransaction(boolean applyBackPressure) throws IOException, 
ProtocolException;
-    
-    void rollbackTransaction();
-    
-    // must be done within a transaction.
-    void transferData(DataPacket dataPacket, FlowFileCodec codec) throws 
IOException, ProtocolException;
-    
-    // must be done within a transaction.
-    DataPacket receiveData(FlowFileCodec codec) throws IOException, 
ProtocolException;
+    Transaction startTransaction(Peer peer, FlowFileCodec codec, 
TransferDirection direction) throws IOException;
     
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 58d26d4..b4d1e5d 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -20,16 +20,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
 
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,18 +38,18 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
@@ -74,6 +70,8 @@ public class SocketClientProtocol implements ClientProtocol {
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
+
+    private SocketClientTransaction transaction;
     
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
     
@@ -236,11 +234,8 @@ public class SocketClientProtocol implements 
ClientProtocol {
     }
 
 
-    // TODO: move up to top with member variables
-    private SocketClientTransaction transaction;
-    
     @Override
-    public void startTransaction(final Peer peer, final TransferDirection 
direction) throws IOException, ProtocolException {
+    public Transaction startTransaction(final Peer peer, final FlowFileCodec 
codec, final TransferDirection direction) throws IOException, ProtocolException 
{
         if ( !handshakeComplete ) {
             throw new IllegalStateException("Handshake has not been 
performed");
         }
@@ -248,204 +243,29 @@ public class SocketClientProtocol implements 
ClientProtocol {
             throw new IllegalStateException("Cannot start transaction; 
handshake resolution was " + handshakeResponse);
         }
         
-        transaction = new SocketClientTransaction(peer, direction, 
useCompression);
-
-        final DataOutputStream dos = transaction.getDataOutputStream();
-        if ( direction == TransferDirection.RECEIVE ) {
-            // Indicate that we would like to have some data
-            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
-            dos.flush();
-            
-            final Response dataAvailableCode = 
Response.read(transaction.getDataInputStream());
-            switch (dataAvailableCode.getCode()) {
-                case MORE_DATA:
-                    logger.debug("{} {} Indicates that data is available", 
this, peer);
-                    transaction.setDataAvailable(true);
-                    break;
-                case NO_MORE_DATA:
-                    logger.debug("{} No data available from {}", peer);
-                    transaction.setDataAvailable(false);
-                    return;
-                default:
-                    throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
-            }
-
-        } else {
-            // Indicate that we would like to have some data
-            RequestType.SEND_FLOWFILES.writeRequestType(dos);
-            dos.flush();
-        }
+        return new SocketClientTransaction(versionNegotiator.getVersion(), 
peer, codec, 
+                       direction, useCompression, (int) 
destination.getYieldPeriod(TimeUnit.MILLISECONDS));
     }
-    
-    @Override
-    public DataPacket receiveData(final FlowFileCodec codec) throws 
IOException, ProtocolException {
-       if ( transaction == null ) {
-               throw new IllegalStateException("Cannot receive data because no 
transaction has been started");
-       }
-       
-       if ( transaction.getTransferDirection() == TransferDirection.SEND ) {
-           throw new IllegalStateException("Attempting to receive data but 
started a SEND Transaction");
-       }
 
-       // if no data available, return null
-       if ( !transaction.isDataAvailable() ) {
-           return null;
-       }
-       
-       final Peer peer = transaction.getPeer();
-        logger.debug("{} Receiving data from {}", this, peer);
-        final DataPacket packet = 
codec.decode(transaction.createCheckedInputStream());
-        
-        if ( packet != null ) {
-            transaction.incrementTransferCount();
-            
-            // Determine if Peer will send us data or has no data to send us
-            final DataInputStream dis = transaction.getDataInputStream();
-            final Response dataAvailableCode = Response.read(dis);
-            switch (dataAvailableCode.getCode()) {
-                case MORE_DATA:
-                    logger.debug("{} {} Indicates that data is available", 
this, peer);
-                    transaction.setDataAvailable(true);
-                    break;
-                case NO_MORE_DATA:
-                    logger.debug("{} No data available from {}", peer);
-                    transaction.setDataAvailable(false);
-                    break;
-                default:
-                    throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
-            }
-        }
-        
-        return packet;
-    }
 
     
-    @Override
-    public void transferData(final DataPacket dataPacket, final FlowFileCodec 
codec) throws IOException, ProtocolException {
-        if ( transaction == null ) {
-            throw new IllegalStateException("Cannot send data because no 
transaction has been started");
-        }
-
-        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) 
{
-            throw new IllegalStateException("Attempting to send data but 
started a RECEIVE Transaction");
-        }
-
-        final Peer peer = transaction.getPeer();
-        logger.debug("{} Sending data to {}", this, peer);
-
-        if ( transaction.getTransferCount() > 0 ) {
-            
ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream());
-        }
+    // TODO: Transaction should be pulled out into its own class.
+    //                 Flow of execution:
+    //                 - start transaction
+    //                 - send/receive data
+    //                 - confirm contents
+    //                         - complete / rollback
+    //
+    //                 - this class should validate transaction state before 
each step.
+    // We need to confirm transaction to ensure that data is correct. Yes, it 
is sent via TCP, which should ensure that the
+    // data is correct, but things happen. Humans make mistakes. There could 
easily be a bug on our end, for example. And this
+    // will ensure that we guard against that. It's a good defensive 
programming strategy.
+    public void confirmTransaction() throws IOException {
         
-        final CheckedOutputStream checkedOutStream = 
transaction.createCheckedOutputStream();
-        codec.encode(dataPacket, checkedOutStream);
-        
-        // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
-        // Otherwise, do NOT close it because we don't want to close the 
underlying stream
-        // (CompressionOutputStream will not close the underlying stream when 
it's closed)
-        if ( useCompression ) {
-            checkedOutStream.close();
-        }
-        
-        transaction.incrementTransferCount();
     }
     
     
-    @Override
-    public void completeTransaction(final boolean applyBackPressure) throws 
ProtocolException, IOException {
-        final SocketClientTransaction transaction = this.transaction;
-        this.transaction = null;
-        
-        if ( transaction == null ) {
-            throw new IllegalStateException("Cannot complete transaction 
because no transaction has been started");
-        }
-        
-        final Peer peer = transaction.getPeer();
-        
-        if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) 
{
-            final boolean moreData = transaction.isDataAvailable();
-            if ( moreData ) {
-                throw new IllegalStateException("Cannot complete transaction 
because the sender has already sent more data than client has consumed.");
-            }
-            
-            // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-            // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
-            // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
-            // session and then when we send the response back to the peer, 
the peer may have timed out and may not
-            // be listening. As a result, it will re-send the data. By doing 
this two-phase commit, we narrow the
-            // Critical Section involved in this transaction so that rather 
than the Critical Section being the
-            // time window involved in the entire transaction, it is reduced 
to a simple round-trip conversation.
-            logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
-            final String calculatedCRC = transaction.calculateCRC();
-            
ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(),
 calculatedCRC);
-            
-            final Response confirmTransactionResponse = 
Response.read(transaction.getDataInputStream());
-            logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
-            
-            switch (confirmTransactionResponse.getCode()) {
-                case CONFIRM_TRANSACTION:
-                    break;
-                case BAD_CHECKSUM:
-                    throw new IOException(this + " Received a BadChecksum 
response from peer " + peer);
-                default:
-                    throw new ProtocolException(this + " Received unexpected 
Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
-            }
-            
-            if ( applyBackPressure ) {
-                // Confirm that we received the data and the peer can now 
discard it but that the peer should not
-                // send any more data for a bit
-                logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
-                
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream());
-            } else {
-                // Confirm that we received the data and the peer can now 
discard it
-                logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, 
peer);
-                
ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream());
-            }
-        } else {
-            logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, 
peer);
-            
ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream());
-            
-            final String calculatedCRC = transaction.calculateCRC();
-            
-            // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
-            final Response transactionConfirmationResponse = 
Response.read(transaction.getDataInputStream());
-            if ( transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION ) {
-                // Confirm checksum and echo back the confirmation.
-                logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
-                final String receivedCRC = 
transactionConfirmationResponse.getMessage();
-                
-                if ( versionNegotiator.getVersion() > 3 ) {
-                    if ( !receivedCRC.equals(calculatedCRC) ) {
-                        
ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream());
-                        throw new IOException(this + " Sent data to peer " + 
peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer 
calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and 
rolling back session");
-                    }
-                }
-                
-                
ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(),
 "");
-            } else {
-                throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
-            }
-        
-            final Response transactionResponse;
-            try {
-                transactionResponse = 
Response.read(transaction.getDataInputStream());
-            } catch (final IOException e) {
-                throw new IOException(this + " Failed to receive a response 
from " + peer + " when expecting a TransactionFinished Indicator. " +
-                        "It is unknown whether or not the peer successfully 
received/processed the data.", e);
-            }
-            
-            logger.debug("{} Received {} from {}", this, transactionResponse, 
peer);
-            if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-                
peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-            } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
-                throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
-            }
-        }
-    }
-    
-    
-    @Override
-    public void rollbackTransaction() {
+    public void cancelTransaction() {
         final SocketClientTransaction transaction = this.transaction;
         this.transaction = null;
         
@@ -456,296 +276,134 @@ public class SocketClientProtocol implements 
ClientProtocol {
         // TODO: IMPLEMENT
     }
     
+    
     @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot receive files; handshake 
resolution was " + handshakeResponse);
-        }
-
-        logger.debug("{} Receiving FlowFiles from {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        // Determine if Peer will send us data or has no data to send us
-        final Response dataAvailableCode = Response.read(dis);
-        switch (dataAvailableCode.getCode()) {
-            case MORE_DATA:
-                logger.debug("{} {} Indicates that data is available", this, 
peer);
-                break;
-            case NO_MORE_DATA:
-               context.yield();
-                logger.debug("{} No data available from {}", peer);
-                return;
-            default:
-                throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-        final Set<FlowFile> flowFilesReceived = new HashSet<>();
-        long bytesReceived = 0L;
-        final CRC32 crc = new CRC32();
-        
-        // Peer has data. Decode the bytes into FlowFiles until peer says he's 
finished sending data.
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        while (continueTransaction) {
-            final InputStream flowFileInputStream = useCompression ? new 
CompressionInputStream(dis) : dis;
-            final CheckedInputStream checkedIn = new 
CheckedInputStream(flowFileInputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            
-            final DataPacket dataPacket = codec.decode(checkedIn);
-            FlowFile flowFile = session.create();
-            flowFile = session.importFrom(dataPacket.getData(), flowFile);
-            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
-            
-            final long transmissionNanos = System.nanoTime() - startNanos;
-            final long transmissionMillis = 
TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
-            
-            final String sourceFlowFileIdentifier = 
flowFile.getAttribute(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, 
CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-            
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", 
Remote DN=" + userDn, transmissionMillis);
-            
-            session.transfer(flowFile, Relationship.ANONYMOUS);
-            bytesReceived += flowFile.getSize();
-            flowFilesReceived.add(flowFile);
-            logger.debug("{} Received {} from {}", this, flowFile, peer);
-            
-            final Response transactionCode = Response.read(dis);
-            switch (transactionCode.getCode()) {
-                case CONTINUE_TRANSACTION:
-                    logger.trace("{} Received ContinueTransaction indicator 
from {}", this, peer);
-                    break;
-                case FINISH_TRANSACTION:
-                    logger.trace("{} Received FinishTransaction indicator from 
{}", this, peer);
-                    continueTransaction = false;
-                    calculatedCRC = 
String.valueOf(checkedIn.getChecksum().getValue());
-                    break;
-                default:
-                    throw new ProtocolException("Received unexpected response 
from peer: when expecting Continue Transaction or Finish Transaction, received" 
+ transactionCode);
-            }
-        }
-        
-        // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-        // to peer so that we can verify that the connection is still open. 
This is a two-phase commit,
-        // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
-        // session and then when we send the response back to the peer, the 
peer may have timed out and may not
-        // be listening. As a result, it will re-send the data. By doing this 
two-phase commit, we narrow the
-        // Critical Section involved in this transaction so that rather than 
the Critical Section being the
-        // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
-        logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
-        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-        
-        final Response confirmTransactionResponse = Response.read(dis);
-        logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
-        
-        switch (confirmTransactionResponse.getCode()) {
-            case CONFIRM_TRANSACTION:
-                break;
-            case BAD_CHECKSUM:
-                session.rollback();
-                throw new IOException(this + " Received a BadChecksum response 
from peer " + peer);
-            default:
-                throw new ProtocolException(this + " Received unexpected 
Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
-        }
-        
-        // Commit the session so that we have persisted the data
-        session.commit();
-        
-        if ( context.getAvailableRelationships().isEmpty() ) {
-            // Confirm that we received the data and the peer can now discard 
it but that the peer should not
-            // send any more data for a bit
-            logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);
-            
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
-        } else {
-            // Confirm that we received the data and the peer can now discard 
it
-            logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
-            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-        }
-        
-        stopWatch.stop();
-        final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-        final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-        logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-    }
+       final String userDn = peer.getCommunicationsSession().getUserDn();
+       final Transaction transaction = startTransaction(peer, codec, 
TransferDirection.RECEIVE);
+       
+       final StopWatch stopWatch = new StopWatch(true);
+       final Set<FlowFile> flowFilesReceived = new HashSet<>();
+       long bytesReceived = 0L;
+       
+       while (true) {
+               final long start = System.nanoTime();
+               final DataPacket dataPacket = transaction.receive();
+               if ( dataPacket == null ) {
+                       break;
+               }
+               
+               FlowFile flowFile = session.create();
+               flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
+               flowFile = session.importFrom(dataPacket.getData(), flowFile);
+               final long receiveNanos = System.nanoTime() - start;
+               
+                       String sourceFlowFileIdentifier = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+                       if ( sourceFlowFileIdentifier == null ) {
+                               sourceFlowFileIdentifier = "<Unknown 
Identifier>";
+                       }
+                       
+                       final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+                       session.getProvenanceReporter().receive(flowFile, 
transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + 
peer.getHost() + ", Remote DN=" + userDn, 
TimeUnit.NANOSECONDS.toMillis(receiveNanos));
 
-    @Override
-    public void transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot transfer files; handshake 
resolution was " + handshakeResponse);
-        }
+               session.transfer(flowFile, Relationship.ANONYMOUS);
+               bytesReceived += dataPacket.getSize();
+       }
 
-        FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
-            return;
-        }
+       // Confirm that what we received was the correct data.
+       transaction.confirm();
+       
+               // Commit the session so that we have persisted the data
+               session.commit();
 
-        logger.debug("{} Sending FlowFiles to {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.SEND_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        final StopWatch stopWatch = new StopWatch(true);
-        final CRC32 crc = new CRC32();
-        
-        long bytesSent = 0L;
-        final Set<FlowFile> flowFilesSent = new HashSet<>();
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        final long startSendingNanos = System.nanoTime();
-        while (continueTransaction) {
-            final OutputStream flowFileOutputStream = useCompression ? new 
CompressionOutputStream(dos) : dos;
-            logger.debug("{} Sending {} to {}", this, flowFile, peer);
-            
-            final CheckedOutputStream checkedOutStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            
-            // call codec.encode within a session callback so that we have the 
InputStream to read the FlowFile
-            final FlowFile toWrap = flowFile;
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    final DataPacket dataPacket = new DataPacket() {
-                        @Override
-                        public Map<String, String> getAttributes() {
-                            return toWrap.getAttributes();
-                        }
+               // We want to apply backpressure if the outgoing connections 
are full. I.e., there are no available relationships.
+               final boolean applyBackpressure = 
context.getAvailableRelationships().isEmpty();
 
-                        @Override
-                        public InputStream getData() {
-                            return in;
-                        }
+               transaction.complete(applyBackpressure);
+               logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
 
-                        @Override
-                        public long getSize() {
-                            return toWrap.getSize();
-                        }
-                    };
-                    
-                    codec.encode(dataPacket, checkedOutStream);
-                }
-            });
-            
-            final long transferNanos = System.nanoTime() - startNanos;
-            final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-            
-            // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
-            // Otherwise, do NOT close it because we don't want to close the 
underlying stream
-            // (CompressionOutputStream will not close the underlying stream 
when it's closed)
-            if ( useCompression ) {
-                checkedOutStream.close();
-            }
-            
-            flowFilesSent.add(flowFile);
-            bytesSent += flowFile.getSize();
-            logger.debug("{} Sent {} to {}", this, flowFile, peer);
-            
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
-            session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
-            session.remove(flowFile);
-            
-            final long sendingNanos = System.nanoTime() - startSendingNanos;
-            if ( sendingNanos < BATCH_SEND_NANOS ) { 
-                flowFile = session.get();
-            } else {
-                flowFile = null;
-            }
-            
-            continueTransaction = (flowFile != null);
-            if ( continueTransaction ) {
-                logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", 
this, peer);
-                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-            } else {
-                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", 
this, peer);
-                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                
-                calculatedCRC = String.valueOf( 
checkedOutStream.getChecksum().getValue() );
-            }
-        }
-        
-        // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
-        final Response transactionConfirmationResponse = Response.read(dis);
-        if ( transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION ) {
-            // Confirm checksum and echo back the confirmation.
-            logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
-            final String receivedCRC = 
transactionConfirmationResponse.getMessage();
-            
-            if ( versionNegotiator.getVersion() > 3 ) {
-                if ( !receivedCRC.equals(calculatedCRC) ) {
-                    ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                    session.rollback();
-                    throw new IOException(this + " Sent data to peer " + peer 
+ " but calculated CRC32 Checksum as " + calculatedCRC + " while peer 
calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and 
rolling back session");
-                }
-            }
-            
-            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
-        } else {
-            throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
-        }
+               stopWatch.stop();
+               final String flowFileDescription = flowFilesReceived.size() < 
20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
+               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+               final String dataSize = 
FormatUtils.formatDataSize(bytesReceived);
+               logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] { 
+                               this, flowFileDescription, dataSize, peer, 
uploadMillis, uploadDataRate });
+    }
 
-        final String flowFileDescription = (flowFilesSent.size() < 20) ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+    
+    @Override
+    public void transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
 
-        final Response transactionResponse;
-        try {
-            transactionResponse = Response.read(dis);
-        } catch (final IOException e) {
-            logger.error("{} Failed to receive a response from {} when 
expecting a TransactionFinished Indicator." +
-                    " It is unknown whether or not the peer successfully 
received/processed the data." +
-                    " Therefore, {} will be rolled back, possibly resulting in 
data duplication of {}", 
-                    this, peer, session, flowFileDescription);
-            session.rollback();
-            throw e;
-        }
-        
-        logger.debug("{} Received {} from {}", this, transactionResponse, 
peer);
-        if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-            peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-        } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
-            throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
-        }
-        
-        // consume input stream entirely, ignoring its contents. If we
-        // don't do this, the Connection will not be returned to the pool
-        stopWatch.stop();
-        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesSent);
-        
-        session.commit();
-        
-        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at 
a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+               try {
+                       final String userDn = 
peer.getCommunicationsSession().getUserDn();
+                       final long startSendingNanos = System.nanoTime();
+                       final StopWatch stopWatch = new StopWatch(true);
+                       long bytesSent = 0L;
+                       
+                       final Transaction transaction = startTransaction(peer, 
codec, TransferDirection.SEND);
+                       
+                       final Set<FlowFile> flowFilesSent = new HashSet<>();
+               boolean continueTransaction = true;
+               while (continueTransaction) {
+                       final long startNanos = System.nanoTime();
+                   // call codec.encode within a session callback so that we 
have the InputStream to read the FlowFile
+                   final FlowFile toWrap = flowFile;
+                   session.read(flowFile, new InputStreamCallback() {
+                       @Override
+                       public void process(final InputStream in) throws 
IOException {
+                           final DataPacket dataPacket = new 
StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+                           transaction.send(dataPacket);
+                       }
+                   });
+                   
+                   final long transferNanos = System.nanoTime() - startNanos;
+                   final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+                   
+                   flowFilesSent.add(flowFile);
+                   bytesSent += flowFile.getSize();
+                   logger.debug("{} Sent {} to {}", this, flowFile, peer);
+                   
+                   final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
+                   session.getProvenanceReporter().send(flowFile, transitUri, 
"Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, 
false);
+                   session.remove(flowFile);
+                   
+                   final long sendingNanos = System.nanoTime() - 
startSendingNanos;
+                   if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                       flowFile = session.get();
+                   } else {
+                       flowFile = null;
+                   }
+                   
+                   continueTransaction = (flowFile != null);
+               }
+               
+               transaction.confirm();
+               
+               // consume input stream entirely, ignoring its contents. If we
+               // don't do this, the Connection will not be returned to the 
pool
+               stopWatch.stop();
+               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesSent);
+               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+               final String dataSize = FormatUtils.formatDataSize(bytesSent);
+               
+               session.commit();
+               transaction.complete(false);
+               
+               final String flowFileDescription = (flowFilesSent.size() < 20) 
? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+               logger.info("{} Successfully sent {} ({}) to {} in {} 
milliseconds at a rate of {}", new Object[] {
+                   this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+               } catch (final Exception e) {
+                       session.rollback();
+                       throw e;
+               }
     }
-
+    
+    
     @Override
     public VersionNegotiator getVersionNegotiator() {
         return versionNegotiator;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 83522a5..129e5aa 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,74 +19,272 @@ package org.apache.nifi.remote.protocol.socket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 
 import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class SocketClientTransaction {
-       private final long startTime = System.nanoTime();
-       private final CRC32 crc = new CRC32();
+public class SocketClientTransaction implements Transaction {
+       private static final Logger logger = 
LoggerFactory.getLogger(SocketClientTransaction.class);
        
-       private final Peer peer;
        
+       private final CRC32 crc = new CRC32();
+       private final int protocolVersion;
+       private final FlowFileCodec codec;
        private final DataInputStream dis;
        private final DataOutputStream dos;
        private final TransferDirection direction;
+       private final boolean compress;
+       private final Peer peer;
+       private final int penaltyMillis;
        
        private boolean dataAvailable = false;
        private int transfers = 0;
+       private TransactionState state;
        
-       SocketClientTransaction(final Peer peer, final TransferDirection 
direction, final boolean useCompression) throws IOException {
+       SocketClientTransaction(final int protocolVersion, final Peer peer, 
final FlowFileCodec codec, 
+                       final TransferDirection direction, final boolean 
useCompression, final int penaltyMillis) throws IOException {
+               this.protocolVersion = protocolVersion;
                this.peer = peer;
+               this.codec = codec;
                this.direction = direction;
                this.dis = new 
DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
                this.dos = new 
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+               this.compress = useCompression;
+               this.state = TransactionState.TRANSACTION_STARTED;
+               this.penaltyMillis = penaltyMillis;
+               
+               initialize();
        }
        
-       int getTransferCount() {
-           return transfers;
+       // TODO: UPDATE STATE
+       private void initialize() throws IOException {
+        if ( direction == TransferDirection.RECEIVE ) {
+            // Indicate that we would like to have some data
+            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+            
+            final Response dataAvailableCode = Response.read(dis);
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", 
this, peer);
+                    this.dataAvailable = true;
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    this.dataAvailable = false;
+                    return;
+                default:
+                    throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
+            }
+
+        } else {
+            // Indicate that we would like to have some data
+            RequestType.SEND_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        }
        }
        
-       void incrementTransferCount() {
-           transfers++;
-       }
        
-       void setDataAvailable(final boolean available) {
-           this.dataAvailable = available;
+       // TODO: UPDATE STATE
+       @Override
+       public DataPacket receive() throws IOException {
+               if ( state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                       throw new IllegalStateException("Cannot receive data 
because Transaction State is " + state);
+               }
+               
+       if ( direction == TransferDirection.SEND ) {
+           throw new IllegalStateException("Attempting to receive data but 
started a SEND Transaction");
+       }
+
+       // if no data available, return null
+       if ( !dataAvailable ) {
+           return null;
+       }
+       
+        logger.debug("{} Receiving data from {}", this, peer);
+        final DataPacket packet = codec.decode(new CheckedInputStream(dis, 
crc));
+        
+        if ( packet != null ) {
+               transfers++;
+            
+            // Determine if Peer will send us data or has no data to send us
+            final Response dataAvailableCode = Response.read(dis);
+            switch (dataAvailableCode.getCode()) {
+                case MORE_DATA:
+                    logger.debug("{} {} Indicates that data is available", 
this, peer);
+                    this.dataAvailable = true;
+                    break;
+                case NO_MORE_DATA:
+                    logger.debug("{} No data available from {}", peer);
+                    this.dataAvailable = false;
+                    break;
+                default:
+                    throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
+            }
+        }
+        
+        return packet;
        }
        
-       boolean isDataAvailable() {
-           return dataAvailable;
-       }
        
-       TransferDirection getTransferDirection() {
-           return direction;
+       // TODO: UPDATE STATE
+       @Override
+       public void send(DataPacket dataPacket) throws IOException {
+               if ( state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                       throw new IllegalStateException("Cannot send data 
because Transaction State is " + state);
+               }
+
+        if ( direction == TransferDirection.RECEIVE ) {
+            throw new IllegalStateException("Attempting to send data but 
started a RECEIVE Transaction");
+        }
+
+               if ( transfers > 0 ) {
+            ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+        }
+
+        logger.debug("{} Sending data to {}", this, peer);
+
+               final OutputStream out = new CheckedOutputStream(dos, crc);
+        codec.encode(dataPacket, out);
+        
+        // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
+        // Otherwise, do NOT close it because we don't want to close the 
underlying stream
+        // (CompressionOutputStream will not close the underlying stream when 
it's closed)
+        if ( compress ) {
+               out.close();
+        }
+        
+        transfers++;
        }
        
-       DataOutputStream getDataOutputStream() {
-               return dos;
-       }
        
-       DataInputStream getDataInputStream() {
-           return dis;
+       // TODO: UPDATE STATE
+       @Override
+       public void cancel() {
+               if ( state == TransactionState.TRANSACTION_CANCELED || state == 
TransactionState.TRANSACTION_COMPLETED ) {
+                       throw new IllegalStateException("Cannot cancel 
transaction because state is already " + state);
+               }
+               
+               // TODO: implement
        }
        
-       CheckedInputStream createCheckedInputStream() {
-           return new CheckedInputStream(dis, crc);
-       }
        
-       CheckedOutputStream createCheckedOutputStream() {
-           return new CheckedOutputStream(dos, crc);
+       // TODO: UPDATE STATE
+       @Override
+       public void complete(boolean applyBackPressure) throws IOException {
+               if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+                       throw new IllegalStateException("Cannot complete 
transaction because state is " + state + 
+                                       "; Transaction can only be completed 
when state is " + TransactionState.TRANSACTION_CONFIRMED);
+               }
+               
+               if ( direction == TransferDirection.RECEIVE ) {
+            if ( applyBackPressure ) {
+                // Confirm that we received the data and the peer can now 
discard it but that the peer should not
+                // send any more data for a bit
+                logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+                
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+            } else {
+                // Confirm that we received the data and the peer can now 
discard it
+                logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, 
peer);
+                ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+            }
+        } else {
+            final Response transactionResponse;
+            try {
+                transactionResponse = Response.read(dis);
+            } catch (final IOException e) {
+                throw new IOException(this + " Failed to receive a response 
from " + peer + " when expecting a TransactionFinished Indicator. " +
+                        "It is unknown whether or not the peer successfully 
received/processed the data.", e);
+            }
+            
+            logger.debug("{} Received {} from {}", this, transactionResponse, 
peer);
+            if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                peer.penalize(penaltyMillis);
+            } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
+                throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
+            }
+        }
        }
        
-       Peer getPeer() {
-               return peer;
+       
+       // TODO: UPDATE STATE
+       @Override
+       public void confirm() throws IOException {
+               if ( state != TransactionState.DATA_EXCHANGED ) {
+                       throw new IllegalStateException("Cannot confirm 
Transaction because state is " + state + 
+                                       "; Transaction can only be confirmed 
when state is " + TransactionState.DATA_EXCHANGED );
+               }
+
+        if ( direction == TransferDirection.RECEIVE ) {
+            if ( dataAvailable ) {
+                throw new IllegalStateException("Cannot complete transaction 
because the sender has already sent more data than client has consumed.");
+            }
+            
+            // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+            // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
+            // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
+            // session and then when we send the response back to the peer, 
the peer may have timed out and may not
+            // be listening. As a result, it will re-send the data. By doing 
this two-phase commit, we narrow the
+            // Critical Section involved in this transaction so that rather 
than the Critical Section being the
+            // time window involved in the entire transaction, it is reduced 
to a simple round-trip conversation.
+            logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
+            final String calculatedCRC = String.valueOf(crc.getValue());
+            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+            
+            final Response confirmTransactionResponse = Response.read(dis);
+            logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
+            
+            switch (confirmTransactionResponse.getCode()) {
+                case CONFIRM_TRANSACTION:
+                    break;
+                case BAD_CHECKSUM:
+                    throw new IOException(this + " Received a BadChecksum 
response from peer " + peer);
+                default:
+                    throw new ProtocolException(this + " Received unexpected 
Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
+            }
+        } else {
+            logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, 
peer);
+            ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+            
+            final String calculatedCRC = String.valueOf(crc.getValue());
+            
+            // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
+            final Response transactionConfirmationResponse = 
Response.read(dis);
+            if ( transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION ) {
+                // Confirm checksum and echo back the confirmation.
+                logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
+                final String receivedCRC = 
transactionConfirmationResponse.getMessage();
+                
+                // CRC was not used before version 4
+                if ( protocolVersion > 3 ) {
+                    if ( !receivedCRC.equals(calculatedCRC) ) {
+                        ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                        throw new IOException(this + " Sent data to peer " + 
peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer 
calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and 
rolling back session");
+                    }
+                }
+                
+                ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+            } else {
+                throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
+            }
+        }
        }
+
        
-       String calculateCRC() {
-           return String.valueOf(crc.getValue());
+       // TODO: UPDATE STATE
+       @Override
+       public TransactionState getState() {
+               return state;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+       private final Map<String, String> attributes;
+       private final InputStream stream;
+       private final long size;
+       
+       public StandardDataPacket(final Map<String, String> attributes, final 
InputStream stream, final long size) {
+               this.attributes = attributes;
+               this.stream = new MinimumLengthInputStream(new 
LimitingInputStream(stream, size), size);
+               this.size = size;
+       }
+       
+       public Map<String, String> getAttributes() {
+               return attributes;
+       }
+       
+       public InputStream getData() {
+               return stream;
+       }
+       
+       public long getSize() {
+               return size;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 82d8206..a51cdba 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -157,7 +157,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             logger.error(message);
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
-        } catch (final ProtocolException | HandshakeException | IOException e) 
{
+        } catch (final HandshakeException | IOException e) {
             final String message = String.format("%s failed to communicate 
with %s due to %s", this, url, e.toString());
             logger.error(message);
             if ( logger.isDebugEnabled() ) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 887429c..d4b9c2f 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -40,6 +40,7 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PortAuthorizationResult;
 import org.apache.nifi.remote.RemoteResourceFactory;
@@ -53,8 +54,10 @@ import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
@@ -304,7 +307,16 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             final CheckedOutputStream checkedOutputStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
 
             final StopWatch transferWatch = new StopWatch(true);
-            flowFile = codec.encode(flowFile, session, checkedOutputStream);
+            
+            final FlowFile toSend = flowFile;
+            session.read(flowFile, new InputStreamCallback() {
+                               @Override
+                               public void process(final InputStream in) 
throws IOException {
+                                       final DataPacket dataPacket = new 
StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+                                       codec.encode(dataPacket, 
checkedOutputStream);
+                               }
+            });
+            
             final long transmissionMillis = 
transferWatch.getElapsed(TimeUnit.MILLISECONDS);
             
             // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
@@ -427,7 +439,11 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             final InputStream flowFileInputStream = useGzip ? new 
CompressionInputStream(dis) : dis;
             final CheckedInputStream checkedInputStream = new 
CheckedInputStream(flowFileInputStream, crc);
 
-            FlowFile flowFile = codec.decode(checkedInputStream, session);
+            final DataPacket dataPacket = codec.decode(checkedInputStream);
+            FlowFile flowFile = session.create();
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
+            
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = 
flowFile.getAttribute(CoreAttributes.UUID.key());

Reply via email to