http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 2b27de2..b0ce357 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -28,22 +28,22 @@ import org.apache.nifi.remote.protocol.ServerProtocol;
 
 public class RemoteResourceFactory extends RemoteResourceInitiator {
 
-       @SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     public static <T extends FlowFileCodec> T receiveCodecNegotiation(final 
DataInputStream dis, final DataOutputStream dos) throws IOException, 
HandshakeException {
         final String codecName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T codec = (T) RemoteResourceManager.createCodec(codecName, 
version);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return codec;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an 
acceptable version of the FlowFileCodec " + codecName);
@@ -51,36 +51,36 @@ public class RemoteResourceFactory extends 
RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveCodecNegotiation(dis, dos);
         }
-       }
-       
-       public static void rejectCodecNegotiation(final DataInputStream dis, 
final DataOutputStream dos, final String explanation) throws IOException {
-               dis.readUTF();  // read codec name
-               dis.readInt();  // read codec version
-               
-               dos.write(ABORT);
-               dos.writeUTF(explanation);
-               dos.flush();
-       }
-       
-       @SuppressWarnings("unchecked")
+    }
+
+    public static void rejectCodecNegotiation(final DataInputStream dis, final 
DataOutputStream dos, final String explanation) throws IOException {
+        dis.readUTF();  // read codec name
+        dis.readInt();  // read codec version
+
+        dos.write(ABORT);
+        dos.writeUTF(explanation);
+        dos.flush();
+    }
+
+    @SuppressWarnings("unchecked")
     public static <T extends ClientProtocol> T 
receiveClientProtocolNegotiation(final DataInputStream dis, final 
DataOutputStream dos) throws IOException, HandshakeException {
         final String protocolName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T protocol = (T) 
RemoteResourceManager.createClientProtocol(protocolName);
         final VersionNegotiator negotiator = protocol.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return protocol;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an 
acceptable version of the ClientProtocol " + protocolName);
@@ -88,28 +88,27 @@ public class RemoteResourceFactory extends 
RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveClientProtocolNegotiation(dis, dos);
         }
     }
-    
-       
-       @SuppressWarnings("unchecked")
+
+    @SuppressWarnings("unchecked")
     public static <T extends ServerProtocol> T 
receiveServerProtocolNegotiation(final DataInputStream dis, final 
DataOutputStream dos) throws IOException, HandshakeException {
-           final String protocolName = dis.readUTF();
+        final String protocolName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T protocol = (T) 
RemoteResourceManager.createServerProtocol(protocolName);
         final VersionNegotiator negotiator = protocol.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return protocol;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an 
acceptable version of the ServerProtocol " + protocolName);
@@ -117,54 +116,53 @@ public class RemoteResourceFactory extends 
RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveServerProtocolNegotiation(dis, dos);
         }
     }
-    
-       
-       
-       
-       public static <T extends VersionedRemoteResource> T 
receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final 
DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] 
constructorArgs) throws IOException, HandshakeException {
-               final String resourceClassName = dis.readUTF();
-               final T resource;
-               try {
+
+    public static <T extends VersionedRemoteResource> T
+        receiveResourceNegotiation(final Class<T> cls, final DataInputStream 
dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final 
Object[] constructorArgs)
+                throws IOException, HandshakeException {
+        final String resourceClassName = dis.readUTF();
+        final T resource;
+        try {
             @SuppressWarnings("unchecked")
-                       final Class<T> resourceClass = (Class<T>) 
Class.forName(resourceClassName);
-            if ( !cls.isAssignableFrom(resourceClass) ) {
-               throw new HandshakeException("Expected to negotiate a Versioned 
Resource of type " + cls.getName() + " but received class name of " + 
resourceClassName);
+            final Class<T> resourceClass = (Class<T>) 
Class.forName(resourceClassName);
+            if (!cls.isAssignableFrom(resourceClass)) {
+                throw new HandshakeException("Expected to negotiate a 
Versioned Resource of type " + cls.getName() + " but received class name of " + 
resourceClassName);
             }
-            
+
             final Constructor<T> ctr = 
resourceClass.getConstructor(constructorArgClasses);
             resource = ctr.newInstance(constructorArgs);
         } catch (final Throwable t) {
-               dos.write(ABORT);
-               final String errorMsg = "Unable to instantiate Versioned 
Resource of type " + resourceClassName;
-               dos.writeUTF(errorMsg);
-               dos.flush();
-               throw new HandshakeException(errorMsg);
+            dos.write(ABORT);
+            final String errorMsg = "Unable to instantiate Versioned Resource 
of type " + resourceClassName;
+            dos.writeUTF(errorMsg);
+            dos.flush();
+            throw new HandshakeException(errorMsg);
         }
-               
+
         final int version = dis.readInt();
         final VersionNegotiator negotiator = resource.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return resource;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
-               dos.write(ABORT);
-               dos.flush();
-               throw new HandshakeException("Unable to negotiate an acceptable 
version of the resource " + resourceClassName);
+            if (preferred == null) {
+                dos.write(ABORT);
+                dos.flush();
+                throw new HandshakeException("Unable to negotiate an 
acceptable version of the resource " + resourceClassName);
             }
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveResourceNegotiation(cls, dis, dos, 
constructorArgClasses, constructorArgs);
         }
-       }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
index f86f066..8bbe7aa 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
@@ -34,20 +34,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RemoteResourceManager {
+
     private static final Map<String, Class<? extends FlowFileCodec>> 
codecClassMap;
     private static final Map<String, Class<? extends ServerProtocol>> 
desiredServerProtocolClassMap = new ConcurrentHashMap<>();
     private static final Map<String, Class<? extends ClientProtocol>> 
desiredClientProtocolClassMap = new ConcurrentHashMap<>();
-    
+
     private static final Map<String, Set<Class<? extends ServerProtocol>>> 
serverProtocolClassMap;
     private static final Map<String, Set<Class<? extends ClientProtocol>>> 
clientProtocolClassMap;
-    
+
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteResourceManager.class);
-    
+
     static {
         final Map<String, Class<? extends FlowFileCodec>> codecMap = new 
HashMap<>();
         final Map<String, Set<Class<? extends ServerProtocol>>> 
serverProtocolMap = new HashMap<>();
         final Map<String, Set<Class<? extends ClientProtocol>>> 
clientProtocolMap = new HashMap<>();
-        
+
         // load all of the FlowFileCodecs that we know
         final ClassLoader classLoader = 
RemoteResourceManager.class.getClassLoader();
         final ServiceLoader<FlowFileCodec> flowFileCodecLoader = 
ServiceLoader.load(FlowFileCodec.class, classLoader);
@@ -58,12 +59,12 @@ public class RemoteResourceManager {
             final String codecName = codec.getResourceName();
 
             final Class<? extends FlowFileCodec> previousValue = 
codecMap.put(codecName, clazz);
-            if ( previousValue != null ) {
-                logger.warn("Multiple FlowFileCodec's found with name {}; 
choosing to use {} in place of {}", 
-                    new Object[] {codecName, clazz.getName(), 
previousValue.getName()});
+            if (previousValue != null) {
+                logger.warn("Multiple FlowFileCodec's found with name {}; 
choosing to use {} in place of {}",
+                        new Object[]{codecName, clazz.getName(), 
previousValue.getName()});
             }
         }
-        
+
         final ServiceLoader<ServerProtocol> serverProtocolLoader = 
ServiceLoader.load(ServerProtocol.class, classLoader);
         final Iterator<ServerProtocol> serverItr = 
serverProtocolLoader.iterator();
         while (serverItr.hasNext()) {
@@ -72,14 +73,14 @@ public class RemoteResourceManager {
             final String protocolName = protocol.getResourceName();
 
             Set<Class<? extends ServerProtocol>> classSet = 
serverProtocolMap.get(protocolName);
-            if ( classSet == null ) {
+            if (classSet == null) {
                 classSet = new HashSet<>();
                 serverProtocolMap.put(protocolName, classSet);
             }
-            
+
             classSet.add(clazz);
         }
-        
+
         final ServiceLoader<ClientProtocol> clientProtocolLoader = 
ServiceLoader.load(ClientProtocol.class, classLoader);
         final Iterator<ClientProtocol> clientItr = 
clientProtocolLoader.iterator();
         while (clientItr.hasNext()) {
@@ -88,133 +89,132 @@ public class RemoteResourceManager {
             final String protocolName = protocol.getResourceName();
 
             Set<Class<? extends ClientProtocol>> classSet = 
clientProtocolMap.get(protocolName);
-            if ( classSet == null ) {
+            if (classSet == null) {
                 classSet = new HashSet<>();
                 clientProtocolMap.put(protocolName, classSet);
             }
-            
+
             classSet.add(clazz);
         }
-        
+
         codecClassMap = Collections.unmodifiableMap(codecMap);
         clientProtocolClassMap = 
Collections.unmodifiableMap(clientProtocolMap);
         serverProtocolClassMap = 
Collections.unmodifiableMap(serverProtocolMap);
     }
 
-    
     public static boolean isCodecSupported(final String codecName) {
         return codecClassMap.containsKey(codecName);
     }
-    
+
     public static boolean isCodecSupported(final String codecName, final int 
version) {
-        if ( !isCodecSupported(codecName) ) {
+        if (!isCodecSupported(codecName)) {
             return false;
         }
-        
+
         final FlowFileCodec codec = createCodec(codecName);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
         return (negotiator.isVersionSupported(version));
     }
-    
+
     public static FlowFileCodec createCodec(final String codecName, final int 
version) {
         final FlowFileCodec codec = createCodec(codecName);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
-        if ( !negotiator.isVersionSupported(version) ) {
+        if (!negotiator.isVersionSupported(version)) {
             throw new IllegalArgumentException("FlowFile Codec " + codecName + 
" does not support version " + version);
         }
-        
+
         negotiator.setVersion(version);
         return codec;
     }
-    
+
     private static FlowFileCodec createCodec(final String codecName) {
         final Class<? extends FlowFileCodec> codecClass = 
codecClassMap.get(codecName);
-        if ( codecClass == null ) {
+        if (codecClass == null) {
             throw new IllegalArgumentException("Unknown Codec: " + codecName);
         }
-        
+
         try {
             return codecClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + 
codecClass.getName(), e);
         }
     }
-    
+
     public static Set<String> getSupportedCodecNames() {
         return codecClassMap.keySet();
     }
-    
+
     public static List<Integer> getSupportedVersions(final String codecName) {
         final FlowFileCodec codec = createCodec(codecName);
         return codec.getSupportedVersions();
     }
-    
+
     public static Set<Class<? extends ClientProtocol>> 
getClientProtocolClasses(final String protocolName) {
         final Set<Class<? extends ClientProtocol>> classes = 
clientProtocolClassMap.get(protocolName);
-        if ( classes == null ) {
+        if (classes == null) {
             return new HashSet<>();
         }
         return new HashSet<>(classes);
     }
-    
+
     public static Set<Class<? extends ServerProtocol>> 
getServerProtocolClasses(final String protocolName) {
         final Set<Class<? extends ServerProtocol>> classes = 
serverProtocolClassMap.get(protocolName);
-        if ( classes == null ) {
+        if (classes == null) {
             return new HashSet<>();
         }
         return new HashSet<>(classes);
     }
-    
+
     public static void setServerProtocolImplementation(final String 
protocolName, final Class<? extends ServerProtocol> clazz) {
         desiredServerProtocolClassMap.put(protocolName, clazz);
     }
-    
+
     public static void setClientProtocolImplementation(final String 
protocolName, final Class<? extends ClientProtocol> clazz) {
         desiredClientProtocolClassMap.put(protocolName, clazz);
     }
-    
+
     public static ServerProtocol createServerProtocol(final String 
protocolName) {
         final Set<Class<? extends ServerProtocol>> classSet = 
getServerProtocolClasses(protocolName);
-        if ( classSet.isEmpty() ) {
+        if (classSet.isEmpty()) {
             throw new IllegalArgumentException("Unknkown Server Protocol: " + 
protocolName);
         }
 
         Class<? extends ServerProtocol> desiredClass = 
desiredServerProtocolClassMap.get(protocolName);
-        if ( desiredClass == null && classSet.size() > 1 ) {
+        if (desiredClass == null && classSet.size() > 1) {
             throw new IllegalStateException("Multiple implementations of 
Server Protocol " + protocolName + " were found and no preferred implementation 
has been specified");
         }
-        
-        if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+        if (desiredClass != null && !classSet.contains(desiredClass)) {
             throw new IllegalStateException("Desired implementation of Server 
Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol 
is not registered as a Server Protocol");
         }
-        
-        if ( desiredClass == null ) {
+
+        if (desiredClass == null) {
             desiredClass = classSet.iterator().next();
         }
-        
+
         try {
             return desiredClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + 
desiredClass.getName(), e);
-        }        
+        }
     }
-    
+
     public static ClientProtocol createClientProtocol(final String 
protocolName) {
         final Set<Class<? extends ClientProtocol>> classSet = 
getClientProtocolClasses(protocolName);
-        if ( classSet.isEmpty() ) {
+        if (classSet.isEmpty()) {
             throw new IllegalArgumentException("Unknkown Client Protocol: " + 
protocolName);
         }
 
         Class<? extends ClientProtocol> desiredClass = 
desiredClientProtocolClassMap.get(protocolName);
-        if ( desiredClass == null && classSet.size() > 1 ) {
+        if (desiredClass == null && classSet.size() > 1) {
             throw new IllegalStateException("Multiple implementations of 
Client Protocol " + protocolName + " were found and no preferred implementation 
has been specified");
         }
-        
-        if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+        if (desiredClass != null && !classSet.contains(desiredClass)) {
             throw new IllegalStateException("Desired implementation of Client 
Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol 
is not registered as a Client Protocol");
         }
-        
-        if ( desiredClass == null ) {
+
+        if (desiredClass == null) {
             desiredClass = classSet.iterator().next();
         }
 
@@ -222,6 +222,6 @@ public class RemoteResourceManager {
             return desiredClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + 
desiredClass.getName(), e);
-        }        
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
index 59e4d0a..6f7b977 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
@@ -30,4 +30,4 @@ public interface RemoteSiteListener {
 
     void stop();
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 493d1fe..809147e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -49,43 +49,42 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketRemoteSiteListener implements RemoteSiteListener {
+
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
     private final int socketPort;
     private final SSLContext sslContext;
     private final NodeInformant nodeInformant;
     private final AtomicReference<ProcessGroup> rootGroup = new 
AtomicReference<>();
-    
+
     private final AtomicBoolean stopped = new AtomicBoolean(false);
-    
+
     private static final Logger LOG = 
LoggerFactory.getLogger(SocketRemoteSiteListener.class);
 
     public SocketRemoteSiteListener(final int socketPort, final SSLContext 
sslContext) {
         this(socketPort, sslContext, null);
     }
-    
+
     public SocketRemoteSiteListener(final int socketPort, final SSLContext 
sslContext, final NodeInformant nodeInformant) {
         this.socketPort = socketPort;
         this.sslContext = sslContext;
         this.nodeInformant = nodeInformant;
     }
 
-    
     @Override
     public void setRootGroup(final ProcessGroup rootGroup) {
         this.rootGroup.set(rootGroup);
     }
 
-    
     @Override
     public void start() throws IOException {
         final boolean secure = (sslContext != null);
-        
+
         final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(true);
         serverSocketChannel.bind(new InetSocketAddress(socketPort));
         stopped.set(false);
-        
+
         final Thread listenerThread = new Thread(new Runnable() {
             private int threadCount = 0;
 
@@ -95,19 +94,21 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                     final ProcessGroup processGroup = rootGroup.get();
                     // If nodeInformant is not null, we are in clustered mode, 
which means that we don't care about
                     // the processGroup.
-                    if ( (nodeInformant == null) && (processGroup == null || 
(processGroup.getInputPorts().isEmpty() && 
processGroup.getOutputPorts().isEmpty())) ) {
-                        try { Thread.sleep(2000L); } catch (final Exception e) 
{}
+                    if ((nodeInformant == null) && (processGroup == null || 
(processGroup.getInputPorts().isEmpty() && 
processGroup.getOutputPorts().isEmpty()))) {
+                        try {
+                            Thread.sleep(2000L);
+                        } catch (final Exception e) {
+                        }
                         continue;
                     }
-                    
-                    
+
                     LOG.trace("Accepting Connection...");
                     Socket acceptedSocket = null;
                     try {
                         serverSocketChannel.configureBlocking(false);
                         final ServerSocket serverSocket = 
serverSocketChannel.socket();
                         serverSocket.setSoTimeout(2000);
-                        while ( !stopped.get() && acceptedSocket == null ) {
+                        while (!stopped.get() && acceptedSocket == null) {
                             try {
                                 acceptedSocket = serverSocket.accept();
                             } catch (final SocketTimeoutException ste) {
@@ -116,14 +117,14 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                         }
                     } catch (final IOException e) {
                         LOG.error("RemoteSiteListener Unable to accept 
connection due to {}", e.toString());
-                        if ( LOG.isDebugEnabled() ) {
+                        if (LOG.isDebugEnabled()) {
                             LOG.error("", e);
                         }
                         continue;
                     }
                     LOG.trace("Got connection");
-                    
-                    if ( stopped.get() ) {
+
+                    if (stopped.get()) {
                         return;
                     }
                     final Socket socket = acceptedSocket;
@@ -135,25 +136,25 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                             final InetAddress inetAddress = 
socket.getInetAddress();
                             String hostname = inetAddress.getHostName();
                             final int slashIndex = hostname.indexOf("/");
-                            if ( slashIndex == 0 ) {
+                            if (slashIndex == 0) {
                                 hostname = hostname.substring(1);
-                            } else if ( slashIndex > 0 ) {
+                            } else if (slashIndex > 0) {
                                 hostname = hostname.substring(0, slashIndex);
                             }
 
                             final int port = socket.getPort();
                             final String peerUri = "nifi://" + hostname + ":" 
+ port;
                             LOG.debug("{} Connection URL is {}", this, 
peerUri);
-                            
+
                             final CommunicationsSession commsSession;
                             final String dn;
                             try {
-                                if ( secure ) {
+                                if (secure) {
                                     final SSLSocketChannel sslSocketChannel = 
new SSLSocketChannel(sslContext, socketChannel, false);
                                     LOG.trace("Channel is secure; 
connecting...");
                                     sslSocketChannel.connect();
                                     LOG.trace("Channel connected");
-                                    
+
                                     commsSession = new 
SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
                                     dn = sslSocketChannel.getDn();
                                     commsSession.setUserDn(dn);
@@ -164,7 +165,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                 }
                             } catch (final Exception e) {
                                 LOG.error("RemoteSiteListener Unable to accept 
connection from {} due to {}", socket, e.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", e);
                                 }
                                 try {
@@ -173,135 +174,136 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                 }
                                 return;
                             }
-                            
+
                             LOG.info("Received connection from {}, User DN: 
{}", socket.getInetAddress(), dn);
-                            
+
                             final InputStream socketIn;
                             final OutputStream socketOut;
-                            
+
                             try {
                                 socketIn = 
commsSession.getInput().getInputStream();
                                 socketOut = 
commsSession.getOutput().getOutputStream();
                             } catch (final IOException e) {
-                               LOG.error("Connection dropped from {} before 
any data was transmitted", peerUri);
-                               try {
-                                       commsSession.close();
-                               } catch (final IOException ioe) {}
-                               
-                               return;
+                                LOG.error("Connection dropped from {} before 
any data was transmitted", peerUri);
+                                try {
+                                    commsSession.close();
+                                } catch (final IOException ioe) {
+                                }
+
+                                return;
                             }
-                            
+
                             final DataInputStream dis = new 
DataInputStream(socketIn);
-                               final DataOutputStream dos = new 
DataOutputStream(socketOut);
-                               
-                               ServerProtocol protocol = null;
-                               Peer peer = null;
+                            final DataOutputStream dos = new 
DataOutputStream(socketOut);
+
+                            ServerProtocol protocol = null;
+                            Peer peer = null;
                             try {
-                               // ensure that we are communicating with 
another NiFi
+                                // ensure that we are communicating with 
another NiFi
                                 LOG.debug("Verifying magic bytes...");
-                               verifyMagicBytes(dis, peerUri);
-
-                               LOG.debug("Receiving Server Protocol 
Negotiation");
-                               protocol = 
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
-                               protocol.setRootProcessGroup(rootGroup.get());
-                                   protocol.setNodeInformant(nodeInformant);
-                               
-                                   final PeerDescription description = new 
PeerDescription("localhost", getPort(), sslContext != null);
-                               peer = new Peer(description, commsSession, 
peerUri, "nifi://localhost:" + getPort());
-                               LOG.debug("Handshaking....");
-                               protocol.handshake(peer);
-                               
-                               if (!protocol.isHandshakeSuccessful()) {
-                                   LOG.error("Handshake failed with {}; 
closing connection", peer);
-                                   try {
-                                       peer.close();
-                                   } catch (final IOException e) {
-                                       LOG.warn("Failed to close {} due to 
{}", peer, e);
-                                   }
-                                   
-                                   // no need to shutdown protocol because we 
failed to perform handshake
-                                   return;
-                               }
-                               
-                               commsSession.setTimeout((int) 
protocol.getRequestExpiration());
-                               
-                               LOG.info("Successfully negotiated 
ServerProtocol {} Version {} with {}", new Object[] {
-                                   protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer});
-                               
-                                   try {
-                                       while (!protocol.isShutdown()) {
-                                           LOG.trace("Getting Protocol Request 
Type...");
-                                           
+                                verifyMagicBytes(dis, peerUri);
+
+                                LOG.debug("Receiving Server Protocol 
Negotiation");
+                                protocol = 
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
+                                protocol.setRootProcessGroup(rootGroup.get());
+                                protocol.setNodeInformant(nodeInformant);
+
+                                final PeerDescription description = new 
PeerDescription("localhost", getPort(), sslContext != null);
+                                peer = new Peer(description, commsSession, 
peerUri, "nifi://localhost:" + getPort());
+                                LOG.debug("Handshaking....");
+                                protocol.handshake(peer);
+
+                                if (!protocol.isHandshakeSuccessful()) {
+                                    LOG.error("Handshake failed with {}; 
closing connection", peer);
+                                    try {
+                                        peer.close();
+                                    } catch (final IOException e) {
+                                        LOG.warn("Failed to close {} due to 
{}", peer, e);
+                                    }
+
+                                    // no need to shutdown protocol because we 
failed to perform handshake
+                                    return;
+                                }
+
+                                commsSession.setTimeout((int) 
protocol.getRequestExpiration());
+
+                                LOG.info("Successfully negotiated 
ServerProtocol {} Version {} with {}", new Object[]{
+                                    protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer});
+
+                                try {
+                                    while (!protocol.isShutdown()) {
+                                        LOG.trace("Getting Protocol Request 
Type...");
+
                                         int timeoutCount = 0;
                                         RequestType requestType = null;
-                                        
-                                        while ( requestType == null ) {
+
+                                        while (requestType == null) {
                                             try {
                                                 requestType = 
protocol.getRequestType(peer);
                                             } catch (final 
SocketTimeoutException e) {
                                                 // Give the timeout a bit 
longer (twice as long) to receive the Request Type,
                                                 // in order to attempt to 
receive more data without shutting down the socket if we don't
                                                 // have to.
-                                                LOG.debug("{} Timed out 
waiting to receive RequestType using {} with {}", new Object[] {this, protocol, 
peer});
+                                                LOG.debug("{} Timed out 
waiting to receive RequestType using {} with {}", new Object[]{this, protocol, 
peer});
                                                 timeoutCount++;
                                                 requestType = null;
-                                                
-                                                if ( timeoutCount >= 2 ) {
+
+                                                if (timeoutCount >= 2) {
                                                     throw e;
                                                 }
                                             }
                                         }
-                                        
+
                                         LOG.debug("Request type from {} is 
{}", protocol, requestType);
-                                           switch (requestType) {
-                                               case NEGOTIATE_FLOWFILE_CODEC:
-                                                   
protocol.negotiateCodec(peer);
-                                                   break;
-                                               case RECEIVE_FLOWFILES:
-                                                   // peer wants to receive 
FlowFiles, so we will transfer FlowFiles.
-                                                   
protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, 
String>());
-                                                   break;
-                                               case SEND_FLOWFILES:
-                                                   // Peer wants to send 
FlowFiles, so we will receive.
+                                        switch (requestType) {
+                                            case NEGOTIATE_FLOWFILE_CODEC:
+                                                protocol.negotiateCodec(peer);
+                                                break;
+                                            case RECEIVE_FLOWFILES:
+                                                // peer wants to receive 
FlowFiles, so we will transfer FlowFiles.
+                                                
protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, 
String>());
+                                                break;
+                                            case SEND_FLOWFILES:
+                                                // Peer wants to send 
FlowFiles, so we will receive.
                                                 
protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, 
String>());
-                                                   break;
-                                               case REQUEST_PEER_LIST:
-                                                   protocol.sendPeerList(peer);
-                                                   break;
-                                               case SHUTDOWN:
-                                                   protocol.shutdown(peer);
-                                                   break;
-                                           }
-                                       }
-                                       LOG.debug("Finished communicating with 
{} ({})", peer, protocol);
-                                   } catch (final Exception e) {
-                                       LOG.error("Unable to communicate with 
remote instance {} ({}) due to {}; closing connection", peer, protocol, 
e.toString());
-                                       if ( LOG.isDebugEnabled() ) {
-                                           LOG.error("", e);
-                                       }
-                                   }
+                                                break;
+                                            case REQUEST_PEER_LIST:
+                                                protocol.sendPeerList(peer);
+                                                break;
+                                            case SHUTDOWN:
+                                                protocol.shutdown(peer);
+                                                break;
+                                        }
+                                    }
+                                    LOG.debug("Finished communicating with {} 
({})", peer, protocol);
+                                } catch (final Exception e) {
+                                    LOG.error("Unable to communicate with 
remote instance {} ({}) due to {}; closing connection", peer, protocol, 
e.toString());
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.error("", e);
+                                    }
+                                }
                             } catch (final IOException e) {
                                 LOG.error("Unable to communicate with remote 
instance {} due to {}; closing connection", peer, e.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", e);
                                 }
                             } catch (final Throwable t) {
                                 LOG.error("Handshake failed when communicating 
with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", t);
                                 }
                             } finally {
                                 LOG.trace("Cleaning up");
                                 try {
-                                    if ( protocol != null && peer != null ) {
+                                    if (protocol != null && peer != null) {
                                         protocol.shutdown(peer);
                                     }
                                 } catch (final Exception protocolException) {
                                     LOG.warn("Failed to shutdown protocol due 
to {}", protocolException.toString());
                                 }
-                                
+
                                 try {
-                                    if ( peer != null ) {
+                                    if (peer != null) {
                                         peer.close();
                                     }
                                 } catch (final Exception peerException) {
@@ -320,30 +322,30 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
         listenerThread.setName("Site-to-Site Listener");
         listenerThread.start();
     }
-    
+
     @Override
     public int getPort() {
         return socketPort;
     }
-    
+
     @Override
     public void stop() {
         stopped.set(true);
     }
-    
+
     private void verifyMagicBytes(final InputStream in, final String 
peerDescription) throws IOException, HandshakeException {
         final byte[] receivedMagicBytes = new 
byte[CommunicationsSession.MAGIC_BYTES.length];
 
         // expect magic bytes
         try {
-            for (int i=0; i < receivedMagicBytes.length; i++) {
+            for (int i = 0; i < receivedMagicBytes.length; i++) {
                 receivedMagicBytes[i] = (byte) in.read();
             }
         } catch (final EOFException e) {
             throw new HandshakeException("Handshake failed (not enough bytes) 
when communicating with " + peerDescription);
         }
-        
-        if ( !Arrays.equals(CommunicationsSession.MAGIC_BYTES, 
receivedMagicBytes) ) {
+
+        if (!Arrays.equals(CommunicationsSession.MAGIC_BYTES, 
receivedMagicBytes)) {
             throw new HandshakeException("Handshake with " + peerDescription + 
" failed because the Magic Header was not present");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index eec6ed5..982d9ff 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -56,14 +56,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRemoteGroupPort extends RemoteGroupPort {
+
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
-    
+
     public static final int GZIP_COMPRESSION_LEVEL = 1;
-    
+
     private static final String CATEGORY = "Site to Site";
-    
+
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteGroupPort.class);
     private final RemoteProcessGroup remoteGroup;
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
@@ -71,28 +72,27 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
     private final SSLContext sslContext;
     private final TransferDirection transferDirection;
-    
+
     private final AtomicReference<SiteToSiteClient> clientRef = new 
AtomicReference<>();
-    
-    
-    public StandardRemoteGroupPort(final String id, final String name, final 
ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, 
+
+    public StandardRemoteGroupPort(final String id, final String name, final 
ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
             final TransferDirection direction, final ConnectableType type, 
final SSLContext sslContext, final ProcessScheduler scheduler) {
         // remote group port id needs to be unique but cannot just be the id 
of the port
         // in the remote group instance. this supports referencing the same 
remote
         // instance more than once.
         super(id, name, processGroup, type, scheduler);
-        
+
         this.remoteGroup = remoteGroup;
         this.transferDirection = direction;
         this.sslContext = sslContext;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
     }
-    
+
     private static File getPeerPersistenceFile(final String portId) {
         final File stateDir = 
NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, portId + ".peers");
     }
-    
+
     @Override
     public boolean isTargetRunning() {
         return targetRunning.get();
@@ -101,18 +101,18 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     public void setTargetRunning(boolean targetRunning) {
         this.targetRunning.set(targetRunning);
     }
-    
+
     @Override
     public boolean isTriggerWhenEmpty() {
         return getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
     }
-    
+
     @Override
     public void shutdown() {
-       super.shutdown();
-        
+        super.shutdown();
+
         final SiteToSiteClient client = clientRef.get();
-        if ( client != null ) {
+        if (client != null) {
             try {
                 client.close();
             } catch (final IOException ioe) {
@@ -120,58 +120,57 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             }
         }
     }
-    
+
     @Override
     public void onSchedulingStart() {
         super.onSchedulingStart();
-        
+
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url(remoteGroup.getTargetUri().toString())
-            .portIdentifier(getIdentifier())
-            .sslContext(sslContext)
-            .eventReporter(remoteGroup.getEventReporter())
-            .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
-            .build();
+                .url(remoteGroup.getTargetUri().toString())
+                .portIdentifier(getIdentifier())
+                .sslContext(sslContext)
+                .eventReporter(remoteGroup.getEventReporter())
+                .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+                .build();
         clientRef.set(client);
     }
-    
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        if ( !remoteGroup.isTransmitting() ) {
+        if (!remoteGroup.isTransmitting()) {
             logger.debug("{} {} is not transmitting; will not send/receive", 
this, remoteGroup);
             return;
         }
 
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && 
session.getQueueSize().getObjectCount() == 0 ) {
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && 
session.getQueueSize().getObjectCount() == 0) {
             logger.debug("{} No data to send", this);
             return;
         }
-        
+
         String url = getRemoteProcessGroup().getTargetUri().toString();
-        
+
         // If we are sending data, we need to ensure that we have at least 1 
FlowFile to send. Otherwise,
         // we don't want to create a transaction at all.
         final FlowFile firstFlowFile;
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
             firstFlowFile = session.get();
-            if ( firstFlowFile == null ) {
+            if (firstFlowFile == null) {
                 return;
             }
         } else {
             firstFlowFile = null;
         }
-        
+
         final SiteToSiteClient client = clientRef.get();
         final Transaction transaction;
         try {
-               transaction = client.createTransaction(transferDirection);
+            transaction = client.createTransaction(transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
             final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port is not in a valid 
state", this, url);
             logger.error(message);
-               session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         } catch (final UnknownPortException e) {
@@ -179,22 +178,22 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             this.targetExists.set(false);
             final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port no longer exists", 
this, url);
             logger.error(message);
-               session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         } catch (final IOException e) {
-               context.yield();
+            context.yield();
             final String message = String.format("%s failed to communicate 
with %s due to %s", this, url, e.toString());
             logger.error(message);
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.error("", e);
             }
-               session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
         }
-        
-        if ( transaction == null ) {
+
+        if (transaction == null) {
             logger.debug("{} Unable to create transaction to communicate with; 
all peers must be penalized, so yielding context", this);
             session.rollback();
             context.yield();
@@ -202,11 +201,11 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         }
 
         try {
-            if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+            if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                 transferFlowFiles(transaction, context, session, 
firstFlowFile);
             } else {
                 final int numReceived = receiveFlowFiles(transaction, context, 
session);
-                if ( numReceived == 0 ) {
+                if (numReceived == 0) {
                     context.yield();
                 }
             }
@@ -215,24 +214,22 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         } catch (final Throwable t) {
             final String message = String.format("%s failed to communicate 
with remote NiFi instance due to %s", this, t.toString());
             logger.error("{} failed to communicate with remote NiFi instance 
due to {}", this, t.toString());
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.error("", t);
             }
-            
+
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             transaction.error();
             session.rollback();
         }
     }
 
-    
     @Override
     public String getYieldPeriod() {
         // delegate yield duration to remote process group
         return remoteGroup.getYieldDuration();
     }
-    
-    
+
     private int transferFlowFiles(final Transaction transaction, final 
ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) 
throws IOException, ProtocolException {
         FlowFile flowFile = firstFlowFile;
 
@@ -241,7 +238,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
             final long startSendingNanos = System.nanoTime();
             final StopWatch stopWatch = new StopWatch(true);
             long bytesSent = 0L;
-            
+
             final Set<FlowFile> flowFilesSent = new HashSet<>();
             boolean continueTransaction = true;
             while (continueTransaction) {
@@ -255,79 +252,78 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
                         transaction.send(dataPacket);
                     }
                 });
-                
+
                 final long transferNanos = System.nanoTime() - startNanos;
                 final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-                
+
                 flowFilesSent.add(flowFile);
                 bytesSent += flowFile.getSize();
                 logger.debug("{} Sent {} to {}", this, flowFile, 
transaction.getCommunicant().getUrl());
-                
+
                 final String transitUri = 
transaction.getCommunicant().getUrl() + "/" + 
flowFile.getAttribute(CoreAttributes.UUID.key());
                 session.getProvenanceReporter().send(flowFile, transitUri, 
"Remote DN=" + userDn, transferMillis, false);
                 session.remove(flowFile);
-                
+
                 final long sendingNanos = System.nanoTime() - 
startSendingNanos;
-                if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                if (sendingNanos < BATCH_SEND_NANOS) {
                     flowFile = session.get();
                 } else {
                     flowFile = null;
                 }
-                
+
                 continueTransaction = (flowFile != null);
             }
-            
+
             transaction.confirm();
-            
+
             // consume input stream entirely, ignoring its contents. If we
             // don't do this, the Connection will not be returned to the pool
             stopWatch.stop();
             final String uploadDataRate = 
stopWatch.calculateDataRate(bytesSent);
             final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesSent);
-            
+
             session.commit();
             transaction.complete();
-            
+
             final String flowFileDescription = (flowFilesSent.size() < 20) ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds 
at a rate of {}", new Object[] {
+            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds 
at a rate of {}", new Object[]{
                 this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
-            
+
             return flowFilesSent.size();
         } catch (final Exception e) {
             session.rollback();
             throw e;
         }
 
-        
     }
-    
+
     private int receiveFlowFiles(final Transaction transaction, final 
ProcessContext context, final ProcessSession session) throws IOException, 
ProtocolException {
         final String userDn = 
transaction.getCommunicant().getDistinguishedName();
-        
+
         final StopWatch stopWatch = new StopWatch(true);
         final Set<FlowFile> flowFilesReceived = new HashSet<>();
         long bytesReceived = 0L;
-        
+
         while (true) {
             final long start = System.nanoTime();
             final DataPacket dataPacket = transaction.receive();
-            if ( dataPacket == null ) {
+            if (dataPacket == null) {
                 break;
             }
-            
+
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             final long receiveNanos = System.nanoTime() - start;
-            
+
             String sourceFlowFileIdentifier = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            if ( sourceFlowFileIdentifier == null ) {
+            if (sourceFlowFileIdentifier == null) {
                 sourceFlowFileIdentifier = "<Unknown Identifier>";
             }
-            
+
             final String transitUri = transaction.getCommunicant().getUrl() + 
sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier, 
+            session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier,
                     "Remote DN=" + userDn, 
TimeUnit.NANOSECONDS.toMillis(receiveNanos));
 
             session.transfer(flowFile, Relationship.ANONYMOUS);
@@ -336,22 +332,22 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
 
         // Confirm that what we received was the correct data.
         transaction.confirm();
-        
+
         // Commit the session so that we have persisted the data
         session.commit();
 
         transaction.complete();
 
-        if ( !flowFilesReceived.isEmpty() ) {
+        if (!flowFilesReceived.isEmpty()) {
             stopWatch.stop();
             final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
             final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
             final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-            logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] { 
-                    this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+            logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[]{
+                this, flowFileDescription, dataSize, 
transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
         }
-        
+
         return flowFilesReceived.size();
     }
 
@@ -371,44 +367,44 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         ValidationResult error = null;
         if (!targetExists.get()) {
             error = new ValidationResult.Builder()
-                .explanation(String.format("Remote instance indicates that 
port '%s' no longer exists.", getName()))
-                .subject(String.format("Remote port '%s'", getName()))
-                .valid(false)
-                .build();
-        } else if ( getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT 
&& getConnections(Relationship.ANONYMOUS).isEmpty() ) {
+                    .explanation(String.format("Remote instance indicates that 
port '%s' no longer exists.", getName()))
+                    .subject(String.format("Remote port '%s'", getName()))
+                    .valid(false)
+                    .build();
+        } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT 
&& getConnections(Relationship.ANONYMOUS).isEmpty()) {
             error = new ValidationResult.Builder()
-                .explanation(String.format("Port '%s' has no outbound 
connections", getName()))
-                .subject(String.format("Remote port '%s'", getName()))
-                .valid(false)
-                .build();
+                    .explanation(String.format("Port '%s' has no outbound 
connections", getName()))
+                    .subject(String.format("Remote port '%s'", getName()))
+                    .valid(false)
+                    .build();
         }
-        
-        if ( error != null ) {
+
+        if (error != null) {
             validationErrors.add(error);
         }
-        
+
         return validationErrors;
     }
-    
+
     @Override
     public void verifyCanStart() {
         super.verifyCanStart();
-        
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && 
getIncomingConnections().isEmpty() ) {
+
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && 
getIncomingConnections().isEmpty()) {
             throw new IllegalStateException("Port " + getName() + " has no 
incoming connections");
         }
     }
-    
+
     @Override
     public void setUseCompression(final boolean useCompression) {
         this.useCompression.set(useCompression);
     }
-    
+
     @Override
     public boolean isUseCompression() {
         return useCompression.get();
     }
-    
+
     @Override
     public String toString() {
         return "RemoteGroupPort[name=" + getName() + ",target=" + 
remoteGroup.getTargetUri().toString() + "]";
@@ -418,34 +414,32 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     public RemoteProcessGroup getRemoteProcessGroup() {
         return remoteGroup;
     }
-    
+
     @Override
     public TransferDirection getTransferDirection() {
         return (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) ? 
TransferDirection.SEND : TransferDirection.RECEIVE;
     }
-    
+
     public void setTargetExists(final boolean exists) {
         this.targetExists.set(exists);
     }
-    
+
     @Override
     public void removeConnection(final Connection connection) throws 
IllegalArgumentException, IllegalStateException {
         super.removeConnection(connection);
-        
-        // If the Port no longer exists on the remote instance and this is the 
last Connection, tell 
+
+        // If the Port no longer exists on the remote instance and this is the 
last Connection, tell
         // RemoteProcessGroup to remove me
-        if ( !getTargetExists() && !hasIncomingConnection() && 
getConnections().isEmpty() ) {
+        if (!getTargetExists() && !hasIncomingConnection() && 
getConnections().isEmpty()) {
             remoteGroup.removeNonExistentPort(this);
         }
     }
-    
-    
+
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;
     }
-    
-    
+
     @Override
     public boolean isSideEffectFree() {
         return false;

Reply via email to