http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 1b5412c..36d8bac 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -91,21 +91,22 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.helpers.MessageFormatter;
 
 public class EndpointConnectionPool {
+
     public static final long PEER_REFRESH_PERIOD = 60000L;
     public static final String CATEGORY = "Site-to-Site";
     public static final long REMOTE_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
 
     private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
-       private static final Logger logger = 
LoggerFactory.getLogger(EndpointConnectionPool.class);
-       
-       private final ConcurrentMap<PeerDescription, 
BlockingQueue<EndpointConnection>> connectionQueueMap = new 
ConcurrentHashMap<>();
+    private static final Logger logger = 
LoggerFactory.getLogger(EndpointConnectionPool.class);
+
+    private final ConcurrentMap<PeerDescription, 
BlockingQueue<EndpointConnection>> connectionQueueMap = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations 
= new ConcurrentHashMap<>();
     private final URI clusterUrl;
     private final String apiUri;
-    
+
     private final AtomicLong peerIndex = new AtomicLong(0L);
-    
+
     private final ReentrantLock peerRefreshLock = new ReentrantLock();
     private volatile List<PeerStatus> peerStatuses;
     private volatile long peerRefreshTime = 0L;
@@ -118,132 +119,129 @@ public class EndpointConnectionPool {
     private final ScheduledExecutorService taskExecutor;
     private final int idleExpirationMillis;
     private final RemoteDestination remoteDestination;
-    
+
     private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
     private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
     private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
     private Integer siteToSitePort;
     private Boolean siteToSiteSecure;
     private long remoteRefreshTime;
-    private final Map<String, String> inputPortMap = new HashMap<>();  // map 
input port name to identifier
-    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
-    
+    private final Map<String, String> inputPortMap = new HashMap<>(); // map 
input port name to identifier
+    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
+
     private volatile int commsTimeout;
     private volatile boolean shutdown = false;
-    
-    
-    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, 
-            final int idleExpirationMillis, final EventReporter eventReporter, 
final File persistenceFile) 
-    {
-       this(clusterUrl, remoteDestination, commsTimeoutMillis, 
idleExpirationMillis, null, eventReporter, persistenceFile);
-    }
-    
+
+    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis,
+            final int idleExpirationMillis, final EventReporter eventReporter, 
final File persistenceFile) {
+        this(clusterUrl, remoteDestination, commsTimeoutMillis, 
idleExpirationMillis, null, eventReporter, persistenceFile);
+    }
+
     public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, final int 
idleExpirationMillis,
-            final SSLContext sslContext, final EventReporter eventReporter, 
final File persistenceFile) 
-    {
+            final SSLContext sslContext, final EventReporter eventReporter, 
final File persistenceFile) {
         Objects.requireNonNull(clusterUrl, "URL cannot be null");
         Objects.requireNonNull(remoteDestination, "Remote Destination/Port 
Identifier cannot be null");
-       try {
-               this.clusterUrl = new URI(clusterUrl);
-       } catch (final URISyntaxException e) {
-               throw new IllegalArgumentException("Invalid Cluster URL: " + 
clusterUrl);
-       }
-       
-       // Trim the trailing /
+        try {
+            this.clusterUrl = new URI(clusterUrl);
+        } catch (final URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid Cluster URL: " + 
clusterUrl);
+        }
+
+        // Trim the trailing /
         String uriPath = this.clusterUrl.getPath();
         if (uriPath.endsWith("/")) {
             uriPath = uriPath.substring(0, uriPath.length() - 1);
         }
         apiUri = this.clusterUrl.getScheme() + "://" + 
this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
-        
+
         this.remoteDestination = remoteDestination;
-       this.sslContext = sslContext;
-       this.peersFile = persistenceFile;
-       this.eventReporter = eventReporter;
-       this.commsTimeout = commsTimeoutMillis;
-       this.idleExpirationMillis = idleExpirationMillis;
-       
-       Set<PeerStatus> recoveredStatuses;
-       if ( persistenceFile != null && persistenceFile.exists() ) {
-               try {
-                       recoveredStatuses = 
recoverPersistedPeerStatuses(peersFile);    
-                       this.peerStatusCache = new 
PeerStatusCache(recoveredStatuses, peersFile.lastModified());
-               } catch (final IOException ioe) {
-                       logger.warn("Failed to recover peer statuses from {} 
due to {}; will continue without loading information from file", 
persistenceFile, ioe);
-               }
-       } else {
-               peerStatusCache = null;
-       }
-
-       // Initialize a scheduled executor and run some maintenance tasks in 
the background to kill off old, unused
-       // connections and keep our list of peers up-to-date.
-       taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-               private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
-               
-                       @Override
-                       public Thread newThread(final Runnable r) {
-                               final Thread thread = 
defaultFactory.newThread(r);
-                               thread.setName("NiFi Site-to-Site Connection 
Pool Maintenance");
-                               return thread;
-                       }
-       });
-
-       taskExecutor.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                               refreshPeers();
-                       }
-       }, 0, 5, TimeUnit.SECONDS);
-
-       taskExecutor.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                               cleanupExpiredSockets();
-                       }
-       }, 5, 5, TimeUnit.SECONDS);
-    }
-    
+        this.sslContext = sslContext;
+        this.peersFile = persistenceFile;
+        this.eventReporter = eventReporter;
+        this.commsTimeout = commsTimeoutMillis;
+        this.idleExpirationMillis = idleExpirationMillis;
+
+        Set<PeerStatus> recoveredStatuses;
+        if (persistenceFile != null && persistenceFile.exists()) {
+            try {
+                recoveredStatuses = recoverPersistedPeerStatuses(peersFile);
+                this.peerStatusCache = new PeerStatusCache(recoveredStatuses, 
peersFile.lastModified());
+            } catch (final IOException ioe) {
+                logger.warn("Failed to recover peer statuses from {} due to 
{}; will continue without loading information from file", persistenceFile, ioe);
+            }
+        } else {
+            peerStatusCache = null;
+        }
+
+        // Initialize a scheduled executor and run some maintenance tasks in 
the background to kill off old, unused
+        // connections and keep our list of peers up-to-date.
+        taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() 
{
+            private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = defaultFactory.newThread(r);
+                thread.setName("NiFi Site-to-Site Connection Pool 
Maintenance");
+                return thread;
+            }
+        });
+
+        taskExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                refreshPeers();
+            }
+        }, 0, 5, TimeUnit.SECONDS);
+
+        taskExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                cleanupExpiredSockets();
+            }
+        }, 5, 5, TimeUnit.SECONDS);
+    }
+
     void warn(final String msg, final Object... args) {
-       logger.warn(msg, args);
-       if ( eventReporter != null ) {
-               eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
-       }
+        logger.warn(msg, args);
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
+        }
     }
-    
+
     void warn(final String msg, final Throwable t) {
-       logger.warn(msg, t);
-       
-       if ( eventReporter != null ) {
-               eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg 
+ ": " + t.toString());
-       }
+        logger.warn(msg, t);
+
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + 
": " + t.toString());
+        }
     }
-    
+
     void error(final String msg, final Object... args) {
-       logger.error(msg, args);
-       if ( eventReporter != null ) {
-               eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
-       }
+        logger.error(msg, args);
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", 
MessageFormatter.arrayFormat(msg, args).getMessage());
+        }
     }
-    
+
     private String getPortIdentifier(final TransferDirection 
transferDirection) throws IOException {
-        if ( remoteDestination.getIdentifier() != null ) {
+        if (remoteDestination.getIdentifier() != null) {
             return remoteDestination.getIdentifier();
         }
-        
-        if ( transferDirection == TransferDirection.RECEIVE ) {
+
+        if (transferDirection == TransferDirection.RECEIVE) {
             return getOutputPortIdentifier(remoteDestination.getName());
         } else {
             return getInputPortIdentifier(remoteDestination.getName());
         }
     }
-    
+
     public EndpointConnection getEndpointConnection(final TransferDirection 
direction) throws IOException, HandshakeException, PortNotRunningException, 
UnknownPortException, ProtocolException {
         return getEndpointConnection(direction, null);
     }
-    
-    
-    public EndpointConnection getEndpointConnection(final TransferDirection 
direction, final SiteToSiteClientConfig config) throws IOException, 
HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
-       //
+
+    public EndpointConnection getEndpointConnection(final TransferDirection 
direction, final SiteToSiteClientConfig config)
+            throws IOException, HandshakeException, PortNotRunningException, 
UnknownPortException, ProtocolException {
+        //
         // Attempt to get a connection state that already exists for this URL.
         //
         FlowFileCodec codec = null;
@@ -255,42 +253,42 @@ public class EndpointConnectionPool {
         logger.debug("{} getting next peer status", this);
         final PeerStatus peerStatus = getNextPeerStatus(direction);
         logger.debug("{} next peer status = {}", this, peerStatus);
-        if ( peerStatus == null ) {
+        if (peerStatus == null) {
             return null;
         }
 
         final PeerDescription peerDescription = 
peerStatus.getPeerDescription();
         BlockingQueue<EndpointConnection> connectionQueue = 
connectionQueueMap.get(peerStatus);
-        if ( connectionQueue == null ) {
+        if (connectionQueue == null) {
             connectionQueue = new LinkedBlockingQueue<>();
             BlockingQueue<EndpointConnection> existing = 
connectionQueueMap.putIfAbsent(peerDescription, connectionQueue);
-            if ( existing != null ) {
+            if (existing != null) {
                 connectionQueue = existing;
             }
         }
-        
+
         final List<EndpointConnection> addBack = new ArrayList<>();
         try {
             do {
                 connection = connectionQueue.poll();
                 logger.debug("{} Connection State for {} = {}", this, 
clusterUrl, connection);
                 final String portId = getPortIdentifier(direction);
-                
-                if ( connection == null && !addBack.isEmpty() ) {
+
+                if (connection == null && !addBack.isEmpty()) {
                     // all available connections have been penalized.
                     logger.debug("{} all Connections for {} are penalized; 
returning no Connection", this, portId);
                     return null;
                 }
-                
-                if ( connection != null && 
connection.getPeer().isPenalized(portId) ) {
+
+                if (connection != null && 
connection.getPeer().isPenalized(portId)) {
                     // we have a connection, but it's penalized. We want to 
add it back to the queue
                     // when we've found one to use.
                     addBack.add(connection);
                     continue;
                 }
-                
+
                 // if we can't get an existing Connection, create one
-                if ( connection == null ) {
+                if (connection == null) {
                     logger.debug("{} No Connection available for Port {}; 
creating new Connection", this, portId);
                     protocol = new SocketClientProtocol();
                     protocol.setDestination(new 
IdEnrichedRemoteDestination(remoteDestination, portId));
@@ -304,7 +302,7 @@ public class EndpointConnectionPool {
                         penalize(peerStatus.getPeerDescription(), 
penalizationMillis);
                         throw ioe;
                     }
-                    
+
                     final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
                     final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
                     try {
@@ -314,72 +312,72 @@ public class EndpointConnectionPool {
                         try {
                             commsSession.close();
                         } catch (final IOException ioe) {
-                               throw e;
+                            throw e;
                         }
                     }
-                
+
                     final String peerUrl = "nifi://" + 
peerDescription.getHostname() + ":" + peerDescription.getPort();
                     peer = new Peer(peerDescription, commsSession, peerUrl, 
clusterUrl.toString());
-    
+
                     // set properties based on config
-                    if ( config != null ) {
+                    if (config != null) {
                         protocol.setTimeout((int) 
config.getTimeout(TimeUnit.MILLISECONDS));
                         
protocol.setPreferredBatchCount(config.getPreferredBatchCount());
                         
protocol.setPreferredBatchSize(config.getPreferredBatchSize());
                         
protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
                     }
-                    
+
                     // perform handshake
                     try {
                         logger.debug("{} performing handshake", this);
                         protocol.handshake(peer);
-                        
+
                         // handle error cases
-                        if ( protocol.isDestinationFull() ) {
-                            logger.warn("{} {} indicates that port {}'s 
destination is full; penalizing peer", 
-                                       this, peer, config.getPortName() == 
null ? config.getPortIdentifier() : config.getPortName());
-                            
+                        if (protocol.isDestinationFull()) {
+                            logger.warn("{} {} indicates that port {}'s 
destination is full; penalizing peer",
+                                    this, peer, config.getPortName() == null ? 
config.getPortIdentifier() : config.getPortName());
+
                             penalize(peer, penalizationMillis);
                             try {
-                               peer.close();
+                                peer.close();
                             } catch (final IOException ioe) {
                             }
-                            
+
                             continue;
-                        } else if ( protocol.isPortInvalid() ) {
-                               penalize(peer, penalizationMillis);
-                               cleanup(protocol, peer);
-                               throw new 
PortNotRunningException(peer.toString() + " indicates that port " + portId + " 
is not running");
-                        } else if ( protocol.isPortUnknown() ) {
-                               penalize(peer, penalizationMillis);
-                               cleanup(protocol, peer);
-                               throw new UnknownPortException(peer.toString() 
+ " indicates that port " + portId + " is not known");
+                        } else if (protocol.isPortInvalid()) {
+                            penalize(peer, penalizationMillis);
+                            cleanup(protocol, peer);
+                            throw new PortNotRunningException(peer.toString() 
+ " indicates that port " + portId + " is not running");
+                        } else if (protocol.isPortUnknown()) {
+                            penalize(peer, penalizationMillis);
+                            cleanup(protocol, peer);
+                            throw new UnknownPortException(peer.toString() + " 
indicates that port " + portId + " is not known");
                         }
-                        
+
                         // negotiate the FlowFileCodec to use
                         logger.debug("{} negotiating codec", this);
                         codec = protocol.negotiateCodec(peer);
                         logger.debug("{} negotiated codec is {}", this, codec);
                     } catch (final PortNotRunningException | 
UnknownPortException e) {
-                       throw e;
+                        throw e;
                     } catch (final Exception e) {
                         penalize(peer, penalizationMillis);
                         cleanup(protocol, peer);
-                        
+
                         final String message = String.format("%s failed to 
communicate with %s due to %s", this, peer == null ? clusterUrl : peer, 
e.toString());
                         error(message);
-                        if ( logger.isDebugEnabled() ) {
+                        if (logger.isDebugEnabled()) {
                             logger.error("", e);
                         }
                         throw e;
                     }
-                    
+
                     connection = new EndpointConnection(peer, protocol, codec);
                 } else {
                     final long lastTimeUsed = connection.getLastTimeUsed();
                     final long millisSinceLastUse = System.currentTimeMillis() 
- lastTimeUsed;
-                    
-                    if ( commsTimeout > 0L && millisSinceLastUse >= 
commsTimeout ) {
+
+                    if (commsTimeout > 0L && millisSinceLastUse >= 
commsTimeout) {
                         cleanup(connection.getSocketClientProtocol(), 
connection.getPeer());
                         connection = null;
                     } else {
@@ -389,68 +387,70 @@ public class EndpointConnectionPool {
                         protocol = connection.getSocketClientProtocol();
                     }
                 }
-            } while ( connection == null || codec == null || commsSession == 
null || protocol == null );
+            } while (connection == null || codec == null || commsSession == 
null || protocol == null);
         } catch (final Throwable t) {
-               if ( commsSession != null ) {
-                       try {
-                               commsSession.close();
-                       } catch (final IOException ioe) {
-                       }
-               }
-               
-               throw t;
+            if (commsSession != null) {
+                try {
+                    commsSession.close();
+                } catch (final IOException ioe) {
+                }
+            }
+
+            throw t;
         } finally {
-            if ( !addBack.isEmpty() ) {
+            if (!addBack.isEmpty()) {
                 connectionQueue.addAll(addBack);
             }
         }
-        
+
         activeConnections.add(connection);
         return connection;
     }
-    
-    
+
     public boolean offer(final EndpointConnection endpointConnection) {
-       final Peer peer = endpointConnection.getPeer();
-       if ( peer == null ) {
-               return false;
-       }
-       
-       final BlockingQueue<EndpointConnection> connectionQueue = 
connectionQueueMap.get(peer.getDescription());
-       if ( connectionQueue == null ) {
-           return false;
-       }
-       
-       activeConnections.remove(endpointConnection);
-       if ( shutdown ) {
-           terminate(endpointConnection);
-           return false;
-       } else {
-           endpointConnection.setLastTimeUsed();
-           return connectionQueue.offer(endpointConnection);
-       }
-    }
-    
+        final Peer peer = endpointConnection.getPeer();
+        if (peer == null) {
+            return false;
+        }
+
+        final BlockingQueue<EndpointConnection> connectionQueue = 
connectionQueueMap.get(peer.getDescription());
+        if (connectionQueue == null) {
+            return false;
+        }
+
+        activeConnections.remove(endpointConnection);
+        if (shutdown) {
+            terminate(endpointConnection);
+            return false;
+        } else {
+            endpointConnection.setLastTimeUsed();
+            return connectionQueue.offer(endpointConnection);
+        }
+    }
+
     private void penalize(final PeerDescription peerDescription, final long 
penalizationMillis) {
         Long expiration = peerTimeoutExpirations.get(peerDescription);
-        if ( expiration == null ) {
+        if (expiration == null) {
             expiration = Long.valueOf(0L);
         }
-        
+
         final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
         peerTimeoutExpirations.put(peerDescription, 
Long.valueOf(newExpiration));
     }
-    
+
     /**
-     * Updates internal state map to penalize a PeerStatus that points to the 
specified peer
-     * @param peer
+     * Updates internal state map to penalize a PeerStatus that points to the
+     * specified peer
+     *
+     * @param peer the peer
+     * @param penalizationMillis period of time to penalize a given peer
      */
     public void penalize(final Peer peer, final long penalizationMillis) {
         penalize(peer.getDescription(), penalizationMillis);
     }
-    
+
     private void cleanup(final SocketClientProtocol protocol, final Peer peer) 
{
-        if ( protocol != null && peer != null ) {
+        if (protocol != null && peer != null) {
             try {
                 protocol.shutdown(peer);
             } catch (final TransmissionDisabledException e) {
@@ -459,8 +459,8 @@ public class EndpointConnectionPool {
             } catch (IOException e1) {
             }
         }
-        
-        if ( peer != null ) {
+
+        if (peer != null) {
             try {
                 peer.close();
             } catch (final TransmissionDisabledException e) {
@@ -470,15 +470,14 @@ public class EndpointConnectionPool {
             }
         }
     }
-    
+
     private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
         return (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
     }
-    
-    
+
     private PeerStatus getNextPeerStatus(final TransferDirection direction) {
         List<PeerStatus> peerList = peerStatuses;
-        if ( isPeerRefreshNeeded(peerList) ) {
+        if (isPeerRefreshNeeded(peerList)) {
             peerRefreshLock.lock();
             try {
                 // now that we have the lock, check again that we need to 
refresh (because another thread
@@ -490,15 +489,15 @@ public class EndpointConnectionPool {
                     } catch (final Exception e) {
                         final String message = String.format("%s Failed to 
update list of peers due to %s", this, e.toString());
                         warn(message);
-                        if ( logger.isDebugEnabled() ) {
+                        if (logger.isDebugEnabled()) {
                             logger.warn("", e);
                         }
-                        
-                        if ( eventReporter != null ) {
-                               eventReporter.reportEvent(Severity.WARNING, 
CATEGORY, message);
+
+                        if (eventReporter != null) {
+                            eventReporter.reportEvent(Severity.WARNING, 
CATEGORY, message);
                         }
                     }
-                    
+
                     this.peerStatuses = peerList;
                     peerRefreshTime = System.currentTimeMillis();
                 }
@@ -507,46 +506,46 @@ public class EndpointConnectionPool {
             }
         }
 
-        if ( peerList == null || peerList.isEmpty() ) {
+        if (peerList == null || peerList.isEmpty()) {
             return null;
         }
 
         PeerStatus peerStatus;
-        for (int i=0; i < peerList.size(); i++) {
+        for (int i = 0; i < peerList.size(); i++) {
             final long idx = peerIndex.getAndIncrement();
             final int listIndex = (int) (idx % peerList.size());
             peerStatus = peerList.get(listIndex);
-            
-            if ( isPenalized(peerStatus) ) {
+
+            if (isPenalized(peerStatus)) {
                 logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
             } else {
                 return peerStatus;
             }
         }
-        
+
         logger.debug("{} All peers appear to be penalized; returning null", 
this);
         return null;
     }
-    
+
     private boolean isPenalized(final PeerStatus peerStatus) {
         final Long expirationEnd = 
peerTimeoutExpirations.get(peerStatus.getPeerDescription());
-        return (expirationEnd == null ? false : expirationEnd > 
System.currentTimeMillis() );
+        return (expirationEnd == null ? false : expirationEnd > 
System.currentTimeMillis());
     }
-    
+
     private List<PeerStatus> createPeerStatusList(final TransferDirection 
direction) throws IOException, HandshakeException, UnknownPortException, 
PortNotRunningException {
         Set<PeerStatus> statuses = getPeerStatuses();
-        if ( statuses == null ) {
+        if (statuses == null) {
             refreshPeers();
             statuses = getPeerStatuses();
-            if ( statuses == null ) {
+            if (statuses == null) {
                 logger.debug("{} found no peers to connect to", this);
                 return Collections.emptyList();
             }
         }
-        
+
         final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
         final List<NodeInformation> nodeInfos = new ArrayList<>();
-        for ( final PeerStatus peerStatus : statuses ) {
+        for (final PeerStatus peerStatus : statuses) {
             final PeerDescription description = 
peerStatus.getPeerDescription();
             final NodeInformation nodeInfo = new 
NodeInformation(description.getHostname(), description.getPort(), 0, 
description.isSecure(), peerStatus.getFlowFileCount());
             nodeInfos.add(nodeInfo);
@@ -554,8 +553,7 @@ public class EndpointConnectionPool {
         clusterNodeInfo.setNodeInformation(nodeInfos);
         return formulateDestinationList(clusterNodeInfo, direction);
     }
-    
-    
+
     private Set<PeerStatus> getPeerStatuses() {
         final PeerStatusCache cache = this.peerStatusCache;
         if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
@@ -576,14 +574,14 @@ public class EndpointConnectionPool {
     }
 
     private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, 
HandshakeException, UnknownPortException, PortNotRunningException {
-       final String hostname = clusterUrl.getHost();
+        final String hostname = clusterUrl.getHost();
         final Integer port = getSiteToSitePort();
-        if ( port == null ) {
+        if (port == null) {
             throw new IOException("Remote instance of NiFi is not configured 
to allow site-to-site communications");
         }
-       
+
         final PeerDescription clusterPeerDescription = new 
PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://";));
-       final CommunicationsSession commsSession = 
establishSiteToSiteConnection(hostname, port);
+        final CommunicationsSession commsSession = 
establishSiteToSiteConnection(hostname, port);
         final Peer peer = new Peer(clusterPeerDescription, commsSession, 
"nifi://" + hostname + ":" + port, clusterUrl.toString());
         final SocketClientProtocol clientProtocol = new SocketClientProtocol();
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
@@ -593,11 +591,11 @@ public class EndpointConnectionPool {
         clientProtocol.setTimeout(commsTimeout);
         if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
             String portId = getPortIdentifier(TransferDirection.RECEIVE);
-            if ( portId == null ) {
+            if (portId == null) {
                 portId = getPortIdentifier(TransferDirection.SEND);
             }
-            
-            if ( portId == null ) {
+
+            if (portId == null) {
                 peer.close();
                 throw new IOException("Failed to determine the identifier of 
port " + remoteDestination.getName());
             }
@@ -605,7 +603,7 @@ public class EndpointConnectionPool {
         } else {
             clientProtocol.handshake(peer, null);
         }
-        
+
         final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
         persistPeerStatuses(peerStatuses);
 
@@ -632,14 +630,13 @@ public class EndpointConnectionPool {
         return peerStatuses;
     }
 
-
     private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-       if ( peersFile == null ) {
-               return;
-       }
-       
+        if (peersFile == null) {
+            return;
+        }
+
         try (final OutputStream fos = new FileOutputStream(peersFile);
-             final OutputStream out = new BufferedOutputStream(fos)) {
+                final OutputStream out = new BufferedOutputStream(fos)) {
 
             for (final PeerStatus status : statuses) {
                 final PeerDescription description = 
status.getPeerDescription();
@@ -679,53 +676,52 @@ public class EndpointConnectionPool {
 
         return statuses;
     }
-    
-    
+
     private CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
         final PeerDescription description = peerStatus.getPeerDescription();
-       return establishSiteToSiteConnection(description.getHostname(), 
description.getPort());
+        return establishSiteToSiteConnection(description.getHostname(), 
description.getPort());
     }
-    
+
     private CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port) throws IOException {
-       final boolean siteToSiteSecure = isSecure();
+        final boolean siteToSiteSecure = isSecure();
         final String destinationUri = "nifi://" + hostname + ":" + port;
 
         CommunicationsSession commsSession = null;
         try {
-               if ( siteToSiteSecure ) {
-                   if ( sslContext == null ) {
-                       throw new IOException("Unable to communicate with " + 
hostname + ":" + port + " because it requires Secure Site-to-Site 
communications, but this instance is not configured for secure communications");
-                   }
-                   
-                   final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
-                   socketChannel.connect();
-           
-                   commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                       
-                       try {
-                           commsSession.setUserDn(socketChannel.getDn());
-                       } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
-                           throw new IOException(ex);
-                       }
-               } else {
-                   final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(hostname, port));
-                   commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
-               }
-       
-               
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-               commsSession.setUri(destinationUri);
+            if (siteToSiteSecure) {
+                if (sslContext == null) {
+                    throw new IOException("Unable to communicate with " + 
hostname + ":" + port
+                            + " because it requires Secure Site-to-Site 
communications, but this instance is not configured for secure communications");
+                }
+
+                final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
+                socketChannel.connect();
+
+                commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+
+                try {
+                    commsSession.setUserDn(socketChannel.getDn());
+                } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
+                    throw new IOException(ex);
+                }
+            } else {
+                final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(hostname, port));
+                commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
+            }
+
+            
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+            commsSession.setUri(destinationUri);
         } catch (final IOException ioe) {
-            if ( commsSession != null ) {
+            if (commsSession != null) {
                 commsSession.close();
             }
-            
+
             throw ioe;
         }
-        
+
         return commsSession;
     }
-    
-    
+
     static List<PeerStatus> formulateDestinationList(final 
ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
         final Collection<NodeInformation> nodeInfoSet = 
clusterNodeInfo.getNodeInformation();
         final int numDestinations = Math.max(128, nodeInfoSet.size());
@@ -743,26 +739,26 @@ public class EndpointConnectionPool {
             final double percentageOfFlowFiles = Math.min(0.8D, ((double) 
flowFileCount / (double) totalFlowFileCount));
             final double relativeWeighting = (direction == 
TransferDirection.SEND) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles;
             final int entries = Math.max(1, (int) (numDestinations * 
relativeWeighting));
-            
+
             entryCountMap.put(nodeInfo, Math.max(1, entries));
             totalEntries += entries;
         }
-        
+
         final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i=0; i < totalEntries; i++) {
+        for (int i = 0; i < totalEntries; i++) {
             destinations.add(null);
         }
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
+        for (final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet()) {
             final NodeInformation nodeInfo = entry.getKey();
             final int numEntries = entry.getValue();
-            
+
             int skipIndex = numEntries;
-            for (int i=0; i < numEntries; i++) {
+            for (int i = 0; i < numEntries; i++) {
                 int n = (skipIndex * i);
                 while (true) {
                     final int index = n % destinations.size();
                     PeerStatus status = destinations.get(index);
-                    if ( status == null ) {
+                    if (status == null) {
                         final PeerDescription description = new 
PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), 
nodeInfo.isSiteToSiteSecure());
                         status = new PeerStatus(description, 
nodeInfo.getTotalFlowFiles());
                         destinations.set(index, status);
@@ -776,7 +772,7 @@ public class EndpointConnectionPool {
 
         final StringBuilder distributionDescription = new StringBuilder();
         distributionDescription.append("New Weighted Distribution of Nodes:");
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
+        for (final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet()) {
             final double percentage = entry.getValue() * 100D / (double) 
destinations.size();
             
distributionDescription.append("\n").append(entry.getKey()).append(" will 
receive ").append(percentage).append("% of data");
         }
@@ -785,55 +781,54 @@ public class EndpointConnectionPool {
         // Jumble the list of destinations.
         return destinations;
     }
-    
-    
+
     private void cleanupExpiredSockets() {
-        for ( final BlockingQueue<EndpointConnection> connectionQueue : 
connectionQueueMap.values()) {
+        for (final BlockingQueue<EndpointConnection> connectionQueue : 
connectionQueueMap.values()) {
             final List<EndpointConnection> connections = new ArrayList<>();
-            
+
             EndpointConnection connection;
             while ((connection = connectionQueue.poll()) != null) {
                 // If the socket has not been used in 10 seconds, shut it down.
                 final long lastUsed = connection.getLastTimeUsed();
-                if ( lastUsed < System.currentTimeMillis() - 
idleExpirationMillis ) {
+                if (lastUsed < System.currentTimeMillis() - 
idleExpirationMillis) {
                     try {
                         
connection.getSocketClientProtocol().shutdown(connection.getPeer());
                     } catch (final Exception e) {
-                        logger.debug("Failed to shut down {} using {} due to 
{}", 
-                            new Object[] 
{connection.getSocketClientProtocol(), connection.getPeer(), e} );
+                        logger.debug("Failed to shut down {} using {} due to 
{}",
+                                new 
Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e});
                     }
-                    
+
                     terminate(connection);
                 } else {
                     connections.add(connection);
                 }
             }
-            
+
             connectionQueue.addAll(connections);
         }
     }
-    
+
     public void shutdown() {
         shutdown = true;
-       taskExecutor.shutdown();
-       peerTimeoutExpirations.clear();
-        
-       for ( final EndpointConnection conn : activeConnections ) {
-           conn.getPeer().getCommunicationsSession().interrupt();
+        taskExecutor.shutdown();
+        peerTimeoutExpirations.clear();
+
+        for (final EndpointConnection conn : activeConnections) {
+            conn.getPeer().getCommunicationsSession().interrupt();
         }
 
-        for ( final BlockingQueue<EndpointConnection> connectionQueue : 
connectionQueueMap.values() ) {
+        for (final BlockingQueue<EndpointConnection> connectionQueue : 
connectionQueueMap.values()) {
             EndpointConnection state;
-            while ( (state = connectionQueue.poll()) != null)  {
+            while ((state = connectionQueue.poll()) != null) {
                 cleanup(state.getSocketClientProtocol(), state.getPeer());
             }
         }
     }
-    
+
     public void terminate(final EndpointConnection connection) {
         cleanup(connection.getSocketClientProtocol(), connection.getPeer());
     }
-    
+
     private void refreshPeers() {
         final PeerStatusCache existingCache = peerStatusCache;
         if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
@@ -851,69 +846,66 @@ public class EndpointConnectionPool {
             }
         }
     }
-    
-    
+
     public String getInputPortIdentifier(final String portName) throws 
IOException {
         return getPortIdentifier(portName, inputPortMap);
     }
-    
+
     public String getOutputPortIdentifier(final String portName) throws 
IOException {
-       return getPortIdentifier(portName, outputPortMap);
+        return getPortIdentifier(portName, outputPortMap);
     }
-    
-    
+
     private String getPortIdentifier(final String portName, final Map<String, 
String> portMap) throws IOException {
-       String identifier;
-       remoteInfoReadLock.lock();
+        String identifier;
+        remoteInfoReadLock.lock();
         try {
-               identifier = portMap.get(portName);
+            identifier = portMap.get(portName);
         } finally {
-               remoteInfoReadLock.unlock();
+            remoteInfoReadLock.unlock();
         }
-        
-        if ( identifier != null ) {
-               return identifier;
+
+        if (identifier != null) {
+            return identifier;
         }
-        
+
         refreshRemoteInfo();
 
-       remoteInfoReadLock.lock();
+        remoteInfoReadLock.lock();
         try {
-               return portMap.get(portName);
+            return portMap.get(portName);
         } finally {
-               remoteInfoReadLock.unlock();
+            remoteInfoReadLock.unlock();
         }
     }
-    
-    
+
     private ControllerDTO refreshRemoteInfo() throws IOException {
-       final boolean webInterfaceSecure = 
clusterUrl.toString().startsWith("https");
+        final boolean webInterfaceSecure = 
clusterUrl.toString().startsWith("https");
         final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? 
sslContext : null);
-               final ControllerDTO controller = utils.getController(apiUri + 
"/controller", commsTimeout);
-        
+        final ControllerDTO controller = utils.getController(apiUri + 
"/controller", commsTimeout);
+
         remoteInfoWriteLock.lock();
         try {
             this.siteToSitePort = controller.getRemoteSiteListeningPort();
             this.siteToSiteSecure = controller.isSiteToSiteSecure();
-            
+
             inputPortMap.clear();
             for (final PortDTO inputPort : controller.getInputPorts()) {
-               inputPortMap.put(inputPort.getName(), inputPort.getId());
+                inputPortMap.put(inputPort.getName(), inputPort.getId());
             }
-            
+
             outputPortMap.clear();
-            for ( final PortDTO outputPort : controller.getOutputPorts()) {
-               outputPortMap.put(outputPort.getName(), outputPort.getId());
+            for (final PortDTO outputPort : controller.getOutputPorts()) {
+                outputPortMap.put(outputPort.getName(), outputPort.getId());
             }
-            
+
             this.remoteRefreshTime = System.currentTimeMillis();
         } finally {
-               remoteInfoWriteLock.unlock();
+            remoteInfoWriteLock.unlock();
         }
-        
+
         return controller;
     }
-    
+
     /**
      * @return the port that the remote instance is listening on for
      * site-to-site communication, or <code>null</code> if the remote instance
@@ -930,7 +922,7 @@ public class EndpointConnectionPool {
                 return listeningPort;
             }
         } finally {
-               remoteInfoReadLock.unlock();
+            remoteInfoReadLock.unlock();
         }
 
         final ControllerDTO controller = refreshRemoteInfo();
@@ -938,19 +930,16 @@ public class EndpointConnectionPool {
 
         return listeningPort;
     }
- 
+
     @Override
     public String toString() {
         return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
     }
-    
-    
+
     /**
-     * Returns {@code true} if the remote instance is configured for secure 
site-to-site communications,
-     * {@code false} otherwise.
-     * 
-     * @return
-     * @throws IOException
+     * @return {@code true} if the remote instance is configured for secure
+     * site-to-site communications, {@code false} otherwise
+     * @throws IOException if unable to check if secure
      */
     public boolean isSecure() throws IOException {
         remoteInfoReadLock.lock();
@@ -960,23 +949,23 @@ public class EndpointConnectionPool {
                 return secure;
             }
         } finally {
-               remoteInfoReadLock.unlock();
+            remoteInfoReadLock.unlock();
         }
 
         final ControllerDTO controller = refreshRemoteInfo();
         final Boolean isSecure = controller.isSiteToSiteSecure();
-        if ( isSecure == null ) {
+        if (isSecure == null) {
             throw new IOException("Remote NiFi instance " + clusterUrl + " is 
not currently configured to accept site-to-site connections");
         }
-        
+
         return isSecure;
     }
-    
-    
+
     private class IdEnrichedRemoteDestination implements RemoteDestination {
+
         private final RemoteDestination original;
         private final String identifier;
-        
+
         public IdEnrichedRemoteDestination(final RemoteDestination original, 
final String identifier) {
             this.original = original;
             this.identifier = identifier;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 4aab3f7..33e4a66 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -33,71 +33,71 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketClient implements SiteToSiteClient {
+
     private static final Logger logger = 
LoggerFactory.getLogger(SocketClient.class);
-    
+
     private final SiteToSiteClientConfig config;
-       private final EndpointConnectionPool pool;
-       private final boolean compress;
-       private final String portName;
-       private final long penalizationNanos;
-       private volatile String portIdentifier;
-       private volatile boolean closed = false;
-       
-       public SocketClient(final SiteToSiteClientConfig config) {
-               pool = new EndpointConnectionPool(config.getUrl(), 
-                       createRemoteDestination(config.getPortIdentifier(), 
config.getPortName()),
-                       (int) config.getTimeout(TimeUnit.MILLISECONDS),
-                       (int) 
config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
-                               config.getSslContext(), 
config.getEventReporter(), config.getPeerPersistenceFile());
-               
-               this.config = config;
-               this.compress = config.isUseCompression();
-               this.portIdentifier = config.getPortIdentifier();
-               this.portName = config.getPortName();
-               this.penalizationNanos = 
config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
-       }
-       
-       @Override
-       public SiteToSiteClientConfig getConfig() {
-           return config;
-       }
-       
-       @Override
-       public boolean isSecure() throws IOException {
-               return pool.isSecure();
-       }
-       
-       private String getPortIdentifier(final TransferDirection direction) 
throws IOException {
-               final String id = this.portIdentifier;
-               if ( id != null ) {
-                       return id;
-               }
-               
-               final String portId;
-               if ( direction == TransferDirection.SEND ) {
-                       portId = pool.getInputPortIdentifier(this.portName);
-               } else {
-                       portId = pool.getOutputPortIdentifier(this.portName);
-               }
-               
-               if (portId == null) {
-                   logger.debug("Unable to resolve port [{}] to an 
identifier", portName);
-               } else {
-                   logger.debug("Resolved port [{}] to identifier [{}]", 
portName, portId);
-                   this.portIdentifier = portId;
-               }
-               
-               return portId;
-       }
-       
-       
-       private RemoteDestination createRemoteDestination(final String portId, 
final String portName) {
-           return new RemoteDestination() {
+    private final EndpointConnectionPool pool;
+    private final boolean compress;
+    private final String portName;
+    private final long penalizationNanos;
+    private volatile String portIdentifier;
+    private volatile boolean closed = false;
+
+    public SocketClient(final SiteToSiteClientConfig config) {
+        pool = new EndpointConnectionPool(config.getUrl(),
+                createRemoteDestination(config.getPortIdentifier(), 
config.getPortName()),
+                (int) config.getTimeout(TimeUnit.MILLISECONDS),
+                (int) 
config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
+                config.getSslContext(), config.getEventReporter(), 
config.getPeerPersistenceFile());
+
+        this.config = config;
+        this.compress = config.isUseCompression();
+        this.portIdentifier = config.getPortIdentifier();
+        this.portName = config.getPortName();
+        this.penalizationNanos = 
config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public SiteToSiteClientConfig getConfig() {
+        return config;
+    }
+
+    @Override
+    public boolean isSecure() throws IOException {
+        return pool.isSecure();
+    }
+
+    private String getPortIdentifier(final TransferDirection direction) throws 
IOException {
+        final String id = this.portIdentifier;
+        if (id != null) {
+            return id;
+        }
+
+        final String portId;
+        if (direction == TransferDirection.SEND) {
+            portId = pool.getInputPortIdentifier(this.portName);
+        } else {
+            portId = pool.getOutputPortIdentifier(this.portName);
+        }
+
+        if (portId == null) {
+            logger.debug("Unable to resolve port [{}] to an identifier", 
portName);
+        } else {
+            logger.debug("Resolved port [{}] to identifier [{}]", portName, 
portId);
+            this.portIdentifier = portId;
+        }
+
+        return portId;
+    }
+
+    private RemoteDestination createRemoteDestination(final String portId, 
final String portName) {
+        return new RemoteDestination() {
             @Override
             public String getIdentifier() {
                 return portId;
             }
-            
+
             @Override
             public String getName() {
                 return portName;
@@ -113,113 +113,112 @@ public class SocketClient implements SiteToSiteClient {
                 return compress;
             }
         };
-       }
-       
-       @Override
-       public Transaction createTransaction(final TransferDirection direction) 
throws IOException {
-           if ( closed ) {
-               throw new IllegalStateException("Client is closed");
-           }
-               final String portId = getPortIdentifier(direction);
-               
-               if ( portId == null ) {
-                       throw new IOException("Could not find Port with name '" 
+ portName + "' for remote NiFi instance");
-               }
-               
-               final EndpointConnection connectionState = 
pool.getEndpointConnection(direction, getConfig());
-               if ( connectionState == null ) {
-                   return null;
-               }
-               
-               final Transaction transaction;
-               try {
-                       transaction = 
connectionState.getSocketClientProtocol().startTransaction(
-                               connectionState.getPeer(), 
connectionState.getCodec(), direction);
-               } catch (final Throwable t) {
-                       pool.terminate(connectionState);
-                       throw new IOException("Unable to create Transaction to 
communicate with " + connectionState.getPeer(), t);
-               }
-               
-               // Wrap the transaction in a new one that will return the 
EndpointConnectionState back to the pool whenever
-               // the transaction is either completed or canceled.
-               final ObjectHolder<EndpointConnection> connectionStateRef = new 
ObjectHolder<>(connectionState);
-               return new Transaction() {
-                       @Override
-                       public void confirm() throws IOException {
-                               transaction.confirm();
-                       }
-
-                       @Override
-                       public TransactionCompletion complete() throws 
IOException {
-                               try {
-                                       return transaction.complete();
-                               } finally {
-                                   final EndpointConnection state = 
connectionStateRef.get();
-                                   if ( state != null ) {
-                                       pool.offer(connectionState);
-                                       connectionStateRef.set(null);
-                                   }
-                               }
-                       }
-
-                       @Override
-                       public void cancel(final String explanation) throws 
IOException {
-                               try {
-                                       transaction.cancel(explanation);
-                               } finally {
+    }
+
+    @Override
+    public Transaction createTransaction(final TransferDirection direction) 
throws IOException {
+        if (closed) {
+            throw new IllegalStateException("Client is closed");
+        }
+        final String portId = getPortIdentifier(direction);
+
+        if (portId == null) {
+            throw new IOException("Could not find Port with name '" + portName 
+ "' for remote NiFi instance");
+        }
+
+        final EndpointConnection connectionState = 
pool.getEndpointConnection(direction, getConfig());
+        if (connectionState == null) {
+            return null;
+        }
+
+        final Transaction transaction;
+        try {
+            transaction = 
connectionState.getSocketClientProtocol().startTransaction(
+                    connectionState.getPeer(), connectionState.getCodec(), 
direction);
+        } catch (final Throwable t) {
+            pool.terminate(connectionState);
+            throw new IOException("Unable to create Transaction to communicate 
with " + connectionState.getPeer(), t);
+        }
+
+        // Wrap the transaction in a new one that will return the 
EndpointConnectionState back to the pool whenever
+        // the transaction is either completed or canceled.
+        final ObjectHolder<EndpointConnection> connectionStateRef = new 
ObjectHolder<>(connectionState);
+        return new Transaction() {
+            @Override
+            public void confirm() throws IOException {
+                transaction.confirm();
+            }
+
+            @Override
+            public TransactionCompletion complete() throws IOException {
+                try {
+                    return transaction.complete();
+                } finally {
                     final EndpointConnection state = connectionStateRef.get();
-                    if ( state != null ) {
+                    if (state != null) {
+                        pool.offer(connectionState);
+                        connectionStateRef.set(null);
+                    }
+                }
+            }
+
+            @Override
+            public void cancel(final String explanation) throws IOException {
+                try {
+                    transaction.cancel(explanation);
+                } finally {
+                    final EndpointConnection state = connectionStateRef.get();
+                    if (state != null) {
                         pool.terminate(connectionState);
                         connectionStateRef.set(null);
                     }
-                               }
-                       }
-
-                       @Override
-                       public void error() {
-                           try {
-                               transaction.error();
-                           } finally {
+                }
+            }
+
+            @Override
+            public void error() {
+                try {
+                    transaction.error();
+                } finally {
                     final EndpointConnection state = connectionStateRef.get();
-                    if ( state != null ) {
+                    if (state != null) {
                         pool.terminate(connectionState);
                         connectionStateRef.set(null);
                     }
-                           }
-                       }
-                       
-                       @Override
-                       public void send(final DataPacket dataPacket) throws 
IOException {
-                               transaction.send(dataPacket);
-                       }
-
-                       @Override
-                       public void send(final byte[] content, final 
Map<String, String> attributes) throws IOException {
-                           transaction.send(content, attributes);
-                       }
-                       
-                       @Override
-                       public DataPacket receive() throws IOException {
-                               return transaction.receive();
-                       }
-
-                       @Override
-                       public TransactionState getState() throws IOException {
-                               return transaction.getState();
-                       }
-
-                       @Override
-                       public Communicant getCommunicant() {
-                           return transaction.getCommunicant();
-                       }
-               };
-       }
-
-       
-       @Override
-       public void close() throws IOException {
-           closed = true;
-               pool.shutdown();
-       }
-       
+                }
+            }
+
+            @Override
+            public void send(final DataPacket dataPacket) throws IOException {
+                transaction.send(dataPacket);
+            }
+
+            @Override
+            public void send(final byte[] content, final Map<String, String> 
attributes) throws IOException {
+                transaction.send(content, attributes);
+            }
+
+            @Override
+            public DataPacket receive() throws IOException {
+                return transaction.receive();
+            }
+
+            @Override
+            public TransactionState getState() throws IOException {
+                return transaction.getState();
+            }
+
+            @Override
+            public Communicant getCommunicant() {
+                return transaction.getCommunicant();
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        closed = true;
+        pool.shutdown();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
index 1380e1b..e79fc47 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
@@ -38,13 +38,13 @@ public interface FlowFileCodec extends 
VersionedRemoteResource {
      * Returns a List of all versions that this codec is able to support, in 
the
      * order that they are preferred by the codec
      *
-     * @return
+     * @return all supported versions
      */
     public List<Integer> getSupportedVersions();
 
     /**
-     * Encodes a DataPacket and its content as a single stream of data and 
writes
-     * that stream to the output.
+     * Encodes a DataPacket and its content as a single stream of data and
+     * writes that stream to the output.
      *
      * @param dataPacket the data to serialize
      * @param outStream the stream to write the data to
@@ -58,12 +58,13 @@ public interface FlowFileCodec extends 
VersionedRemoteResource {
      * Decodes the contents of the InputStream, interpreting the data to
      * determine the next DataPacket's attributes and content.
      *
-     * @param stream an InputStream containing DataPacket's content and 
attributes
+     * @param stream an InputStream containing DataPacket's content and
+     * attributes
      *
-     * @return the DataPacket that was created, or <code>null</code> if the 
stream
-     * was out of data
+     * @return the DataPacket that was created, or <code>null</code> if the
+     * stream was out of data
      *
-     * @throws IOException
+     * @throws IOException if unable to read stream
      * @throws ProtocolException if the input is malformed
      * @throws TransmissionDisabledException if a user terminates the 
connection
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index 6fd92de..0bee537 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -34,7 +34,8 @@ import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.stream.io.StreamUtils;
 
 public class StandardFlowFileCodec implements FlowFileCodec {
-       public static final int MAX_NUM_ATTRIBUTES = 25000;
+
+    public static final int MAX_NUM_ATTRIBUTES = 25000;
 
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
@@ -43,30 +44,29 @@ public class StandardFlowFileCodec implements FlowFileCodec 
{
     public StandardFlowFileCodec() {
         versionNegotiator = new StandardVersionNegotiator(1);
     }
-    
+
     @Override
     public void encode(final DataPacket dataPacket, final OutputStream 
encodedOut) throws IOException {
         final DataOutputStream out = new DataOutputStream(encodedOut);
-        
+
         final Map<String, String> attributes = dataPacket.getAttributes();
         out.writeInt(attributes.size());
-        for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
+        for (final Map.Entry<String, String> entry : attributes.entrySet()) {
             writeString(entry.getKey(), out);
             writeString(entry.getValue(), out);
         }
-        
+
         out.writeLong(dataPacket.getSize());
-        
+
         final InputStream in = dataPacket.getData();
         StreamUtils.copy(in, encodedOut);
         encodedOut.flush();
     }
 
-    
     @Override
     public DataPacket decode(final InputStream stream) throws IOException, 
ProtocolException {
         final DataInputStream in = new DataInputStream(stream);
-        
+
         final int numAttributes;
         try {
             numAttributes = in.readInt();
@@ -74,22 +74,22 @@ public class StandardFlowFileCodec implements FlowFileCodec 
{
             // we're out of data.
             return null;
         }
-        
+
         // This is here because if the stream is not properly formed, we could 
get up to Integer.MAX_VALUE attributes, which will
         // generally result in an OutOfMemoryError.
-        if ( numAttributes > MAX_NUM_ATTRIBUTES ) {
-               throw new ProtocolException("FlowFile exceeds maximum number of 
attributes with a total of " + numAttributes);
+        if (numAttributes > MAX_NUM_ATTRIBUTES) {
+            throw new ProtocolException("FlowFile exceeds maximum number of 
attributes with a total of " + numAttributes);
         }
-        
+
         final Map<String, String> attributes = new HashMap<>(numAttributes);
-        for (int i=0; i < numAttributes; i++) {
+        for (int i = 0; i < numAttributes; i++) {
             final String attrName = readString(in);
             final String attrValue = readString(in);
             attributes.put(attrName, attrValue);
         }
-        
+
         final long numBytes = in.readLong();
-        
+
         return new StandardDataPacket(attributes, stream, numBytes);
     }
 
@@ -99,14 +99,13 @@ public class StandardFlowFileCodec implements FlowFileCodec 
{
         out.write(bytes);
     }
 
-    
     private String readString(final DataInputStream in) throws IOException {
         final int numBytes = in.readInt();
         final byte[] bytes = new byte[numBytes];
         StreamUtils.fillBuffer(in, bytes, true);
         return new String(bytes, "UTF-8");
     }
-    
+
     @Override
     public List<Integer> getSupportedVersions() {
         return versionNegotiator.getSupportedVersions();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
index d4d55e1..198aaef 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -18,13 +18,14 @@ package org.apache.nifi.remote.exception;
 
 import java.io.IOException;
 
-
 /**
- * A HandshakeException occurs when the client and the remote NiFi instance do 
not agree
- * on some condition during the handshake. For example, if the NiFi instance 
does not recognize
- * one of the parameters that the client passes during the Handshaking phase.
+ * A HandshakeException occurs when the client and the remote NiFi instance do
+ * not agree on some condition during the handshake. For example, if the NiFi
+ * instance does not recognize one of the parameters that the client passes
+ * during the Handshaking phase.
  */
 public class HandshakeException extends IOException {
+
     private static final long serialVersionUID = 178192341908726L;
 
     public HandshakeException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
index 8b97832..09fc05c 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -17,11 +17,12 @@
 package org.apache.nifi.remote.exception;
 
 /**
- * PortNotRunningException occurs when the remote NiFi instance reports
- * that the Port that the client is attempting to communicate with is not
- * currently running and therefore communications with that Port are not 
allowed.
+ * PortNotRunningException occurs when the remote NiFi instance reports that 
the
+ * Port that the client is attempting to communicate with is not currently
+ * running and therefore communications with that Port are not allowed.
  */
 public class PortNotRunningException extends ProtocolException {
+
     private static final long serialVersionUID = -2790940982005516375L;
 
     public PortNotRunningException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index 45a4e15..cc6ae50 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -19,8 +19,8 @@ package org.apache.nifi.remote.exception;
 import java.io.IOException;
 
 /**
- * A ProtocolException occurs when unexpected data is received, for example
- * an invalid Response Code.
+ * A ProtocolException occurs when unexpected data is received, for example an
+ * invalid Response Code.
  */
 public class ProtocolException extends IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
index 592a1b3..4249075 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -17,10 +17,12 @@
 package org.apache.nifi.remote.exception;
 
 /**
- * An UnknownPortException indicates that the remote NiFi instance has 
reported that
- * the endpoint that the client attempted to communicate with does not exist.
+ * An UnknownPortException indicates that the remote NiFi instance has reported
+ * that the endpoint that the client attempted to communicate with does not
+ * exist.
  */
 public class UnknownPortException extends ProtocolException {
+
     private static final long serialVersionUID = -2790940982005516375L;
 
     public UnknownPortException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
index 8065f57..6180c3c 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -22,11 +22,12 @@ import java.nio.channels.SocketChannel;
 import org.apache.nifi.remote.AbstractCommunicationsSession;
 
 public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
+
     private final SocketChannel channel;
     private final SocketChannelInput request;
     private final SocketChannelOutput response;
     private int timeout = 30000;
-    
+
     public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel, final String uri) throws IOException {
         super(uri);
         request = new SocketChannelInput(socketChannel);
@@ -34,12 +35,12 @@ public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSe
         channel = socketChannel;
         socketChannel.configureBlocking(false);
     }
-    
+
     @Override
     public boolean isClosed() {
         return !channel.isConnected();
     }
-    
+
     @Override
     public SocketChannelInput getInput() {
         return request;
@@ -65,28 +66,28 @@ public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSe
     @Override
     public void close() throws IOException {
         IOException suppressed = null;
-        
+
         try {
             request.consume();
         } catch (final IOException ioe) {
             suppressed = ioe;
         }
-        
+
         try {
             channel.close();
         } catch (final IOException ioe) {
-            if ( suppressed != null ) {
+            if (suppressed != null) {
                 ioe.addSuppressed(suppressed);
             }
-            
+
             throw ioe;
         }
-        
-        if ( suppressed != null ) {
+
+        if (suppressed != null) {
             throw suppressed;
         }
     }
-    
+
     @Override
     public boolean isDataAvailable() {
         return request.isDataAvailable();
@@ -101,7 +102,7 @@ public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSe
     public long getBytesRead() {
         return request.getBytesRead();
     }
-    
+
     @Override
     public void interrupt() {
         request.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 7dffddd..68a8dc4 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -26,18 +26,19 @@ import org.apache.nifi.remote.io.InterruptableInputStream;
 import org.apache.nifi.remote.protocol.CommunicationsInput;
 
 public class SocketChannelInput implements CommunicationsInput {
+
     private final SocketChannelInputStream socketIn;
     private final ByteCountingInputStream countingIn;
     private final InputStream bufferedIn;
     private final InterruptableInputStream interruptableIn;
-    
+
     public SocketChannelInput(final SocketChannel socketChannel) throws 
IOException {
         this.socketIn = new SocketChannelInputStream(socketChannel);
         countingIn = new ByteCountingInputStream(socketIn);
         bufferedIn = new BufferedInputStream(countingIn);
         interruptableIn = new InterruptableInputStream(bufferedIn);
     }
-    
+
     @Override
     public InputStream getInputStream() throws IOException {
         return interruptableIn;
@@ -46,7 +47,7 @@ public class SocketChannelInput implements 
CommunicationsInput {
     public void setTimeout(final int millis) {
         socketIn.setTimeout(millis);
     }
-    
+
     public boolean isDataAvailable() {
         try {
             return interruptableIn.available() > 0;
@@ -54,12 +55,12 @@ public class SocketChannelInput implements 
CommunicationsInput {
             return false;
         }
     }
-    
+
     @Override
     public long getBytesRead() {
         return countingIn.getBytesRead();
     }
-    
+
     public void interrupt() {
         interruptableIn.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
index 26c0164..13974a5 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
@@ -26,32 +26,33 @@ import org.apache.nifi.remote.io.InterruptableOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsOutput;
 
 public class SocketChannelOutput implements CommunicationsOutput {
+
     private final SocketChannelOutputStream socketOutStream;
     private final ByteCountingOutputStream countingOut;
     private final OutputStream bufferedOut;
     private final InterruptableOutputStream interruptableOut;
-    
+
     public SocketChannelOutput(final SocketChannel socketChannel) throws 
IOException {
         socketOutStream = new SocketChannelOutputStream(socketChannel);
         countingOut = new ByteCountingOutputStream(socketOutStream);
         bufferedOut = new BufferedOutputStream(countingOut);
         interruptableOut = new InterruptableOutputStream(bufferedOut);
     }
-    
+
     @Override
     public OutputStream getOutputStream() throws IOException {
         return interruptableOut;
     }
-    
+
     public void setTimeout(final int timeout) {
         socketOutStream.setTimeout(timeout);
     }
-    
+
     @Override
     public long getBytesWritten() {
         return countingOut.getBytesWritten();
     }
-    
+
     public void interrupt() {
         interruptableOut.interrupt();
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
index 50e9162..5e5abc7 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -21,17 +21,18 @@ import java.io.IOException;
 import org.apache.nifi.remote.AbstractCommunicationsSession;
 
 public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
+
     private final SSLSocketChannel channel;
     private final SSLSocketChannelInput request;
     private final SSLSocketChannelOutput response;
-    
+
     public SSLSocketChannelCommunicationsSession(final SSLSocketChannel 
channel, final String uri) {
         super(uri);
         request = new SSLSocketChannelInput(channel);
         response = new SSLSocketChannelOutput(channel);
         this.channel = channel;
     }
-    
+
     @Override
     public SSLSocketChannelInput getInput() {
         return request;
@@ -55,33 +56,33 @@ public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunication
     @Override
     public void close() throws IOException {
         IOException suppressed = null;
-        
+
         try {
             request.consume();
         } catch (final IOException ioe) {
             suppressed = ioe;
         }
-        
+
         try {
             channel.close();
         } catch (final IOException ioe) {
-            if ( suppressed != null ) {
+            if (suppressed != null) {
                 ioe.addSuppressed(suppressed);
             }
-            
+
             throw ioe;
         }
-        
-        if ( suppressed != null ) {
+
+        if (suppressed != null) {
             throw suppressed;
         }
     }
-    
+
     @Override
     public boolean isClosed() {
         return channel.isClosed();
     }
-    
+
     @Override
     public boolean isDataAvailable() {
         try {
@@ -105,7 +106,7 @@ public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunication
     public void interrupt() {
         channel.interrupt();
     }
-    
+
     @Override
     public String toString() {
         return super.toString() + "[SSLSocketChannel=" + channel + "]";

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
index 01fb9f2..6cd2344 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
@@ -24,25 +24,26 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.remote.protocol.CommunicationsInput;
 
 public class SSLSocketChannelInput implements CommunicationsInput {
+
     private final SSLSocketChannelInputStream in;
     private final ByteCountingInputStream countingIn;
     private final InputStream bufferedIn;
-    
+
     public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
         in = new SSLSocketChannelInputStream(socketChannel);
         countingIn = new ByteCountingInputStream(in);
         this.bufferedIn = new BufferedInputStream(countingIn);
     }
-    
+
     @Override
     public InputStream getInputStream() throws IOException {
         return bufferedIn;
     }
-    
+
     public boolean isDataAvailable() throws IOException {
         return bufferedIn.available() > 0;
     }
-    
+
     @Override
     public long getBytesRead() {
         return countingIn.getBytesRead();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
index dc3d68f..33d13cb 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -24,9 +24,10 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsOutput;
 
 public class SSLSocketChannelOutput implements CommunicationsOutput {
+
     private final OutputStream out;
     private final ByteCountingOutputStream countingOut;
-    
+
     public SSLSocketChannelOutput(final SSLSocketChannel channel) {
         countingOut = new ByteCountingOutputStream(new 
SSLSocketChannelOutputStream(channel));
         out = new BufferedOutputStream(countingOut);
@@ -36,7 +37,7 @@ public class SSLSocketChannelOutput implements 
CommunicationsOutput {
     public OutputStream getOutputStream() throws IOException {
         return out;
     }
-    
+
     @Override
     public long getBytesWritten() {
         return countingOut.getBytesWritten();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 36a0e8d..2efea11 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -48,37 +48,27 @@ public interface ClientProtocol extends 
VersionedRemoteResource {
 
     boolean isReadyForFileTransfer();
 
-    
-    
-    
     Transaction startTransaction(Peer peer, FlowFileCodec codec, 
TransferDirection direction) throws IOException;
-    
-    
+
     /**
-     * returns <code>true</code> if remote instance indicates that the port is
+     * @return <code>true</code> if remote instance indicates that the port is
      * invalid
-     *
-     * @return
      * @throws IllegalStateException if a handshake has not successfully
      * completed
      */
     boolean isPortInvalid() throws IllegalStateException;
 
     /**
-     * returns <code>true</code> if remote instance indicates that the port is
+     * @return <code>true</code> if remote instance indicates that the port is
      * unknown
-     *
-     * @return
      * @throws IllegalStateException if a handshake has not successfully
      * completed
      */
     boolean isPortUnknown();
 
     /**
-     * returns <code>true</code> if remote instance indicates that the port's
+     * @return <code>true</code> if remote instance indicates that the port's
      * destination is full
-     *
-     * @return
      * @throws IllegalStateException if a handshake has not successfully
      * completed
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
index 5e56902..3fa3e96 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -23,10 +23,11 @@ public interface CommunicationsInput {
 
     /**
      * Reads all data currently on the socket and throws it away
-     * @throws IOException
+     *
+     * @throws IOException if unable to consume
      */
     void consume() throws IOException;
-    
+
     InputStream getInputStream() throws IOException;
 
     long getBytesRead();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b612b6bc/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
index d009cec..aff73ba 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -55,10 +55,8 @@ public interface CommunicationsSession extends Closeable {
     void interrupt();
 
     /**
-     * Returns <code>true</code> if the connection is closed, 
<code>false</code>
-     * otherwise.
-     *
-     * @return
+     * @return <code>true</code> if the connection is closed, 
<code>false</code>
+     * otherwise
      */
     boolean isClosed();
 }

Reply via email to