Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client a6293e340 -> c174d3a60


NIFI-282: Refactoring to make client from site-to-site components


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c174d3a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c174d3a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c174d3a6

Branch: refs/heads/site-to-site-client
Commit: c174d3a600358ebed8b8064247785606af6c6134
Parents: a6293e3
Author: Mark Payne <[email protected]>
Authored: Tue Jan 20 19:07:18 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Jan 20 19:07:18 2015 -0500

----------------------------------------------------------------------
 nifi/commons/site-to-site-client/pom.xml        |  13 +
 .../apache/nifi/remote/client/DataPacket.java   |  28 --
 .../nifi/remote/client/SiteToSiteClient.java    |   5 +-
 .../apache/nifi/remote/client/Transaction.java  |  21 ++
 .../socket/EndpointConnectionStatePool.java     | 309 +++++++++++++------
 .../nifi/remote/client/socket/SocketClient.java | 151 ++++++++-
 .../nifi/remote/protocol/ClientProtocol.java    |  15 +
 .../apache/nifi/remote/protocol/DataPacket.java |  29 ++
 .../protocol/socket/SocketClientProtocol.java   |  73 ++++-
 .../socket/SocketClientTransaction.java         |  66 ++++
 .../nifi/remote/util/RemoteNiFiUtils.java       | 216 +++++++++++++
 .../apache/nifi/groups/RemoteProcessGroup.java  |  30 --
 .../nifi/remote/StandardRemoteProcessGroup.java |  89 +-----
 .../util/RemoteProcessGroupUtils.java           | 216 -------------
 .../nifi/remote/RemoteResourceFactory.java      |   8 +
 .../nifi/remote/StandardRemoteGroupPort.java    |  28 +-
 .../socket/SocketFlowFileServerProtocol.java    |   9 +-
 .../apache/nifi/remote/RemoteDestination.java   |  10 -
 18 files changed, 822 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/pom.xml 
b/nifi/commons/site-to-site-client/pom.xml
index 7719d55..d65f440 100644
--- a/nifi/commons/site-to-site-client/pom.xml
+++ b/nifi/commons/site-to-site-client/pom.xml
@@ -21,6 +21,19 @@
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-utils</artifactId>
        </dependency>
+       <dependency>
+               <groupId>com.sun.jersey</groupId>
+               <artifactId>jersey-client</artifactId>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>client-dto</artifactId>
+               <version>0.0.1-incubating-SNAPSHOT</version>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-web-utils</artifactId>
+       </dependency>
        
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
deleted file mode 100644
index ec77f2c..0000000
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client;
-
-import java.io.InputStream;
-import java.util.Map;
-
-public interface DataPacket {
-
-       Map<String, String> getAttributes();
-       
-       InputStream getData();
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 47a09be..34cb56a 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.remote.client;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-public interface SiteToSiteClient {
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClient extends Closeable {
 
        void send(DataPacket dataPacket) throws IOException;
        

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
new file mode 100644
index 0000000..bae6e51
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/Transaction.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+public interface Transaction {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index d20fb58..0718bb1 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -41,10 +41,16 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
@@ -72,21 +78,28 @@ import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSessio
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.remote.util.PeerStatusCache;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EndpointConnectionStatePool {
     public static final long PEER_REFRESH_PERIOD = 60000L;
     public static final String CATEGORY = "Site-to-Site";
+    public static final long REMOTE_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
     private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 
        private static final Logger logger = 
LoggerFactory.getLogger(EndpointConnectionStatePool.class);
        
-       private final ConcurrentMap<String, 
BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new 
ConcurrentHashMap<>();
+       private final BlockingQueue<EndpointConnectionState> 
connectionStateQueue = new LinkedBlockingQueue<>();
     private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new 
ConcurrentHashMap<>();
-
+    private final URI clusterUrl;
+    private final String apiUri;
+    
     private final AtomicLong peerIndex = new AtomicLong(0L);
     
     private final ReentrantLock peerRefreshLock = new ReentrantLock();
@@ -98,15 +111,41 @@ public class EndpointConnectionStatePool {
     private final File peersFile;
     private final EventReporter eventReporter;
     private final SSLContext sslContext;
+    private final ScheduledExecutorService taskExecutor;
+    
+    private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
+    private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
+    private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
+    private Integer siteToSitePort;
+    private Boolean siteToSiteSecure;
+    private long remoteRefreshTime;
+    private final Map<String, String> inputPortMap = new HashMap<>();  // map 
input port name to identifier
+    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
+    
+    private volatile int commsTimeout;
 
-    public EndpointConnectionStatePool(final EventReporter eventReporter, 
final File persistenceFile) {
-       this(null, eventReporter, persistenceFile);
+    public EndpointConnectionStatePool(final String clusterUrl, final int 
commsTimeoutMillis, final EventReporter eventReporter, final File 
persistenceFile) {
+       this(clusterUrl, commsTimeoutMillis, null, eventReporter, 
persistenceFile);
     }
     
-    public EndpointConnectionStatePool(final SSLContext sslContext, final 
EventReporter eventReporter, final File persistenceFile) {
+    public EndpointConnectionStatePool(final String clusterUrl, final int 
commsTimeoutMillis, final SSLContext sslContext, final EventReporter 
eventReporter, final File persistenceFile) {
+       try {
+               this.clusterUrl = new URI(clusterUrl);
+       } catch (final URISyntaxException e) {
+               throw new IllegalArgumentException("Invalid Cluster URL: " + 
clusterUrl);
+       }
+       
+       // Trim the trailing /
+        String uriPath = this.clusterUrl.getPath();
+        if (uriPath.endsWith("/")) {
+            uriPath = uriPath.substring(0, uriPath.length() - 1);
+        }
+        apiUri = this.clusterUrl.getScheme() + "://" + 
this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
+        
        this.sslContext = sslContext;
        this.peersFile = persistenceFile;
        this.eventReporter = eventReporter;
+       this.commsTimeout = commsTimeoutMillis;
        
        Set<PeerStatus> recoveredStatuses;
        if ( persistenceFile != null && persistenceFile.exists() ) {
@@ -119,21 +158,39 @@ public class EndpointConnectionStatePool {
        } else {
                peerStatusCache = null;
        }
+
+       // Initialize a scheduled executor and run some maintenance tasks in 
the background to kill off old, unused
+       // connections and keep our list of peers up-to-date.
+       taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+               private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
+               
+                       @Override
+                       public Thread newThread(final Runnable r) {
+                               final Thread thread = 
defaultFactory.newThread(r);
+                               thread.setName("NiFi Site-to-Site Connection 
Pool Maintenance");
+                               return thread;
+                       }
+       });
+
+       taskExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               refreshPeers();
+                       }
+       }, 0, 5, TimeUnit.SECONDS);
+
+       taskExecutor.scheduleWithFixedDelay(new Runnable() {
+                       @Override
+                       public void run() {
+                               cleanupExpiredSockets();
+                       }
+       }, 5, 5, TimeUnit.SECONDS);
     }
     
-    public EndpointConnectionState getEndpointConnectionState(final String 
clusterUrl, final RemoteDestination remoteDestination, final TransferDirection 
direction) throws IOException, HandshakeException, PortNotRunningException, 
UnknownPortException, ProtocolException {
+    public EndpointConnectionState getEndpointConnectionState(final 
RemoteDestination remoteDestination, final TransferDirection direction) throws 
IOException, HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
        //
         // Attempt to get a connection state that already exists for this URL.
         //
-        BlockingQueue<EndpointConnectionState> connectionStateQueue = 
endpointConnectionMap.get(clusterUrl);
-        if ( connectionStateQueue == null ) {
-            connectionStateQueue = new LinkedBlockingQueue<>();
-            BlockingQueue<EndpointConnectionState> existingQueue = 
endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue);
-            if ( existingQueue != null ) {
-                connectionStateQueue = existingQueue;
-            }
-        }
-        
         FlowFileCodec codec = null;
         CommunicationsSession commsSession = null;
         SocketClientProtocol protocol = null;
@@ -172,7 +229,7 @@ public class EndpointConnectionStatePool {
                 
                 
                 final String peerUrl = "nifi://" + peerStatus.getHostname() + 
":" + peerStatus.getPort();
-                peer = new Peer(commsSession, peerUrl, clusterUrl);
+                peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
                 
                 // perform handshake
                 try {
@@ -214,9 +271,8 @@ public class EndpointConnectionStatePool {
             } else {
                 final long lastTimeUsed = connectionState.getLastTimeUsed();
                 final long millisSinceLastUse = System.currentTimeMillis() - 
lastTimeUsed;
-                final long timeoutMillis = 
remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
                 
-                if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis 
) {
+                if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) 
{
                     cleanup(connectionState.getSocketClientProtocol(), 
connectionState.getPeer());
                     connectionState = null;
                 } else {
@@ -243,12 +299,7 @@ public class EndpointConnectionStatePool {
                return false;
        }
        
-       final BlockingQueue<EndpointConnectionState> queue = 
endpointConnectionMap.get(url);
-       if ( queue == null ) {
-               return false;
-       }
-       
-       return queue.offer(endpointConnectionState);
+       return connectionStateQueue.offer(endpointConnectionState);
     }
     
     /**
@@ -365,7 +416,7 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public Set<PeerStatus> getPeerStatuses() {
+    private Set<PeerStatus> getPeerStatuses() {
         final PeerStatusCache cache = this.peerStatusCache;
         if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
             return null;
@@ -384,12 +435,12 @@ public class EndpointConnectionStatePool {
         return cache.getStatuses();
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, 
final boolean secure) throws IOException, HandshakeException, 
UnknownPortException, PortNotRunningException, BadRequestException {
-       final String hostname = destinationUri.getHost();
-        final int port = destinationUri.getPort();
+    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, 
HandshakeException, UnknownPortException, PortNotRunningException, 
BadRequestException {
+       final String hostname = clusterUrl.getHost();
+        final int port = getSiteToSitePort();
        
-       final CommunicationsSession commsSession = 
establishSiteToSiteConnection(hostname, port, secure);
-        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + 
port, destinationUri.toString());
+       final CommunicationsSession commsSession = 
establishSiteToSiteConnection(hostname, port);
+        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + 
port, clusterUrl.toString());
         final SocketClientProtocol clientProtocol = new SocketClientProtocol();
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
@@ -399,8 +450,8 @@ public class EndpointConnectionStatePool {
             throw new BadRequestException(e.toString());
         }
 
-        // TODO: Make the 30000 millis configurable
-        clientProtocol.handshake(peer, null, 30000);
+        clientProtocol.setTimeout(commsTimeout);
+        clientProtocol.handshake(peer, null);
         final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
         persistPeerStatuses(peerStatuses);
 
@@ -474,38 +525,41 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
-       return establishSiteToSiteConnection(peerStatus.getHostname(), 
peerStatus.getPort(), peerStatus.isSecure());
+    private CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
+       return establishSiteToSiteConnection(peerStatus.getHostname(), 
peerStatus.getPort());
     }
     
-    public CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port, final boolean secure) throws IOException {
+    private CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port) throws IOException {
+       if ( siteToSiteSecure == null ) {
+               throw new IOException("Remote NiFi instance " + clusterUrl + " 
is not currently configured to accept site-to-site connections");
+       }
+       
         final String destinationUri = "nifi://" + hostname + ":" + port;
 
         CommunicationsSession commsSession = null;
         try {
-        if ( secure ) {
-            if ( sslContext == null ) {
-                throw new IOException("Unable to communicate with " + hostname 
+ ":" + port + " because it requires Secure Site-to-Site communications, but 
this instance is not configured for secure communications");
-            }
-            
-            final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
-            socketChannel.connect();
-    
-            commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-        } else {
-            final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(hostname, port));
-            commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
-        }
-
-        
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-        commsSession.setUri(destinationUri);
+               if ( siteToSiteSecure ) {
+                   if ( sslContext == null ) {
+                       throw new IOException("Unable to communicate with " + 
hostname + ":" + port + " because it requires Secure Site-to-Site 
communications, but this instance is not configured for secure communications");
+                   }
+                   
+                   final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
+                   socketChannel.connect();
+           
+                   commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
+                       
+                       try {
+                           commsSession.setUserDn(socketChannel.getDn());
+                       } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
+                           throw new IOException(ex);
+                       }
+               } else {
+                   final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(hostname, port));
+                   commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
+               }
+       
+               
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
+               commsSession.setUri(destinationUri);
         } catch (final IOException ioe) {
             if ( commsSession != null ) {
                 commsSession.close();
@@ -578,59 +632,52 @@ public class EndpointConnectionStatePool {
     }
     
     
-    public void cleanupExpiredSockets() {
+    private void cleanupExpiredSockets() {
         final List<EndpointConnectionState> states = new ArrayList<>();
         
-        for ( final BlockingQueue<EndpointConnectionState> queue : 
endpointConnectionMap.values() ) {
-            states.clear();
-            
-            EndpointConnectionState state;
-            while ((state = queue.poll()) != null) {
-                // If the socket has not been used in 10 seconds, shut it down.
-                final long lastUsed = state.getLastTimeUsed();
-                if ( lastUsed < System.currentTimeMillis() - 10000L ) {
-                    try {
-                        
state.getSocketClientProtocol().shutdown(state.getPeer());
-                    } catch (final Exception e) {
-                        logger.debug("Failed to shut down {} using {} due to 
{}", 
-                            new Object[] {state.getSocketClientProtocol(), 
state.getPeer(), e} );
-                    }
-                    
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                } else {
-                    states.add(state);
+        EndpointConnectionState state;
+        while ((state = connectionStateQueue.poll()) != null) {
+            // If the socket has not been used in 10 seconds, shut it down.
+            final long lastUsed = state.getLastTimeUsed();
+            if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+                try {
+                    state.getSocketClientProtocol().shutdown(state.getPeer());
+                } catch (final Exception e) {
+                    logger.debug("Failed to shut down {} using {} due to {}", 
+                        new Object[] {state.getSocketClientProtocol(), 
state.getPeer(), e} );
                 }
+                
+                cleanup(state.getSocketClientProtocol(), state.getPeer());
+            } else {
+                states.add(state);
             }
-            
-            queue.addAll(states);
         }
+        
+        connectionStateQueue.addAll(states);
     }
     
     public void shutdown() {
+       taskExecutor.shutdown();
        peerTimeoutExpirations.clear();
             
         for ( final CommunicationsSession commsSession : activeCommsChannels ) 
{
             commsSession.interrupt();
         }
         
-        for ( final BlockingQueue<EndpointConnectionState> queue : 
endpointConnectionMap.values() ) {
-            EndpointConnectionState state;
-            while ( (state = queue.poll()) != null)  {
-                cleanup(state.getSocketClientProtocol(), state.getPeer());
-            }
+        EndpointConnectionState state;
+        while ( (state = connectionStateQueue.poll()) != null)  {
+            cleanup(state.getSocketClientProtocol(), state.getPeer());
         }
-        
-        endpointConnectionMap.clear();
     }
     
-    public void refreshPeers(final URI targetUri, final boolean secure) {
+    private void refreshPeers() {
         final PeerStatusCache existingCache = peerStatusCache;
         if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
             return;
         }
 
         try {
-            final Set<PeerStatus> statuses = 
fetchRemotePeerStatuses(targetUri, secure);
+            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
             peerStatusCache = new PeerStatusCache(statuses);
             logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", this, statuses.size());
         } catch (Exception e) {
@@ -639,6 +686,92 @@ public class EndpointConnectionStatePool {
                 logger.warn("", e);
             }
         }
+    }
+    
+    
+    public String getInputPortIdentifier(final String portName) throws 
IOException {
+        return getPortIdentifier(portName, inputPortMap);
+    }
+    
+    public String getOutputPortIdentifier(final String portName) throws 
IOException {
+       return getPortIdentifier(portName, outputPortMap);
+    }
+    
+    
+    private String getPortIdentifier(final String portName, final Map<String, 
String> portMap) throws IOException {
+       String identifier;
+       remoteInfoReadLock.lock();
+        try {
+               identifier = portMap.get(portName);
+        } finally {
+               remoteInfoReadLock.unlock();
+        }
+        
+        if ( identifier != null ) {
+               return identifier;
+        }
+        
+        refreshRemoteInfo();
+
+       remoteInfoReadLock.lock();
+        try {
+               return portMap.get(portName);
+        } finally {
+               remoteInfoReadLock.unlock();
+        }
+    }
+    
+    
+    private ControllerDTO refreshRemoteInfo() throws IOException {
+       final boolean webInterfaceSecure = 
clusterUrl.toString().startsWith("https");
+        final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? 
sslContext : null);
+               final ControllerDTO controller = 
utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
+        
+        remoteInfoWriteLock.lock();
+        try {
+            this.siteToSitePort = controller.getRemoteSiteListeningPort();
+            this.siteToSiteSecure = controller.isSiteToSiteSecure();
+            
+            inputPortMap.clear();
+            for (final PortDTO inputPort : controller.getInputPorts()) {
+               inputPortMap.put(inputPort.getName(), inputPort.getId());
+            }
+            
+            outputPortMap.clear();
+            for ( final PortDTO outputPort : controller.getOutputPorts()) {
+               outputPortMap.put(outputPort.getName(), outputPort.getId());
+            }
+            
+            this.remoteRefreshTime = System.currentTimeMillis();
+        } finally {
+               remoteInfoWriteLock.unlock();
+        }
+        
+        return controller;
+    }
+    
+    /**
+     * @return the port that the remote instance is listening on for
+     * site-to-site communication, or <code>null</code> if the remote instance
+     * is not configured to allow site-to-site communications.
+     *
+     * @throws IOException if unable to communicate with the remote instance
+     */
+    private Integer getSiteToSitePort() throws IOException {
+        Integer listeningPort;
+        remoteInfoReadLock.lock();
+        try {
+            listeningPort = this.siteToSitePort;
+            if (listeningPort != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+                return listeningPort;
+            }
+        } finally {
+               remoteInfoReadLock.unlock();
+        }
+
+        final ControllerDTO controller = refreshRemoteInfo();
+        listeningPort = controller.getRemoteSiteListeningPort();
 
+        return listeningPort;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 48e9cc5..b81b425 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -16,16 +16,87 @@
  */
 package org.apache.nifi.remote.client.socket;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.remote.client.DataPacket;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 public class SocketClient implements SiteToSiteClient {
-
+       private final EndpointConnectionStatePool pool;
+       private final boolean compress;
+       private final String portName;
+       private final long penalizationNanos;
+       private volatile String portIdentifier;
+       
+       private SocketClient(final Builder builder) {
+               pool = new EndpointConnectionStatePool(builder.url, (int) 
TimeUnit.NANOSECONDS.toMillis(builder.timeoutNanos), 
+                               builder.sslContext, builder.eventReporter, 
builder.peerPersistenceFile);
+               
+               this.compress = builder.useCompression;
+               this.portIdentifier = builder.portIdentifier;
+               this.portName = builder.portName;
+               this.penalizationNanos = builder.penalizationNanos;
+       }
+       
+       
+       private String getPortIdentifier(final TransferDirection direction) 
throws IOException {
+               final String id = this.portIdentifier;
+               if ( id != null ) {
+                       return id;
+               }
+               
+               if ( direction == TransferDirection.SEND ) {
+                       return pool.getInputPortIdentifier(this.portName);
+               } else {
+                       return pool.getOutputPortIdentifier(this.portName);
+               }
+       }
+       
+       
        @Override
        public void send(final DataPacket dataPacket) throws IOException {
-               // TODO Auto-generated method stub
+               final String portId = getPortIdentifier(TransferDirection.SEND);
+               
+               if ( portId == null ) {
+                       throw new IOException("Could not find Port with name " 
+ portName + " for remote NiFi instance");
+               }
+               
+               final RemoteDestination remoteDestination = new 
RemoteDestination() {
+                       @Override
+                       public String getIdentifier() {
+                               return portId;
+                       }
+
+                       @Override
+                       public long getYieldPeriod(final TimeUnit timeUnit) {
+                               return timeUnit.convert(penalizationNanos, 
TimeUnit.NANOSECONDS);
+                       }
+
+                       @Override
+                       public boolean isUseCompression() {
+                               return compress;
+                       }
+               };
+               
+               final EndpointConnectionState connectionState;
+               try {
+                       connectionState = 
pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+               } catch (final ProtocolException | HandshakeException | 
PortNotRunningException | UnknownPortException e) {
+                       throw new IOException(e);
+               }
+               
+               
        }
 
        @Override
@@ -33,5 +104,79 @@ public class SocketClient implements SiteToSiteClient {
                // TODO Auto-generated method stub
                return null;
        }
+       
+       @Override
+       public void close() throws IOException {
+               pool.shutdown();
+       }
 
+       
+       public static class Builder {
+               private String url;
+               private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+               private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+               private SSLContext sslContext;
+               private EventReporter eventReporter;
+               private File peerPersistenceFile;
+               private boolean useCompression;
+               private String portName;
+               private String portIdentifier;
+               
+               public Builder url(final String url) {
+                       this.url = url;
+                       return this;
+               }
+               
+               public Builder timeout(final long timeout, final TimeUnit unit) 
{
+                       this.timeoutNanos = unit.toNanos(timeout);
+                       return this;
+               }
+               
+               public Builder nodePenalizationPeriod(final long period, final 
TimeUnit unit) {
+                       this.penalizationNanos = unit.toNanos(period);
+                       return this;
+               }
+               
+               public Builder sslContext(final SSLContext sslContext) {
+                       this.sslContext = sslContext;
+                       return this;
+               }
+               
+               public Builder eventReporter(final EventReporter eventReporter) 
{
+                       this.eventReporter = eventReporter;
+                       return this;
+               }
+               
+               public Builder peerPersistenceFile(final File 
peerPersistenceFile) {
+                       this.peerPersistenceFile = peerPersistenceFile;
+                       return this;
+               }
+               
+               public Builder useCompression(final boolean compress) {
+                       this.useCompression = compress;
+                       return this;
+               }
+               
+               public Builder portName(final String portName) {
+                       this.portName = portName;
+                       return this;
+               }
+               
+               public Builder portIdentifier(final String portIdentifier) {
+                       this.portIdentifier = portIdentifier;
+                       return this;
+               }
+               
+               public SocketClient build() {
+                       if ( url == null ) {
+                               throw new IllegalStateException("Must specify 
URL to build Site-to-Site client");
+                       }
+                       
+                       if ( portName == null && portIdentifier == null ) {
+                               throw new IllegalStateException("Must specify 
either Port Name or Port Identifier to builder Site-to-Site client");
+                       }
+                       
+                       return new SocketClient(this);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 32274eb..d817425 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -46,6 +47,20 @@ public interface ClientProtocol extends 
VersionedRemoteResource {
 
     boolean isReadyForFileTransfer();
 
+    
+    
+    
+    void startTransaction(Peer peer, TransferDirection direction) throws 
IOException;
+    
+    void completeTransaction();
+    
+    void rollbackTransaction();
+    
+    void transferData(Peer peer, DataPacket dataPacket, FlowFileCodec codec) 
throws IOException, ProtocolException;
+    
+    DataPacket receiveData(Peer peer, FlowFileCodec codec) throws IOException, 
ProtocolException;
+    
+    
     /**
      * returns <code>true</code> if remote instance indicates that the port is
      * invalid

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+       Map<String, String> getAttributes();
+       
+       InputStream getData();
+       
+       long getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 560385c..6b0c94b 100644
--- 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -41,7 +41,9 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.Transaction;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -50,6 +52,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
@@ -60,7 +63,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(4, 3, 2, 1);
 
     private RemoteDestination destination;
-    private boolean useCompression;
+    private boolean useCompression = false;
     
     private String commsIdentifier;
     private boolean handshakeComplete = false;
@@ -70,6 +73,7 @@ public class SocketClientProtocol implements ClientProtocol {
     private Response handshakeResponse = null;
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
+    private int timeoutMillis = 30000;
     
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
     
@@ -81,13 +85,16 @@ public class SocketClientProtocol implements ClientProtocol 
{
         this.useCompression = destination.isUseCompression();
     }
     
+    public void setTimeout(final int timeoutMillis) {
+       this.timeoutMillis = timeoutMillis;
+    }
     
     @Override
     public void handshake(final Peer peer) throws IOException, 
HandshakeException {
-       handshake(peer, destination.getIdentifier(), (int) 
destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+       handshake(peer, destination.getIdentifier());
     }
     
-    public void handshake(final Peer peer, final String destinationId, final 
int timeoutMillis) throws IOException, HandshakeException {
+    public void handshake(final Peer peer, final String destinationId) throws 
IOException, HandshakeException {
         if ( handshakeComplete ) {
             throw new IllegalStateException("Handshake has already been 
completed");
         }
@@ -228,6 +235,65 @@ public class SocketClientProtocol implements 
ClientProtocol {
         return codec;
     }
 
+
+    // TODO: move up to top with member variables
+    private SocketClientTransaction transaction;
+    
+    @Override
+    public void startTransaction(final Peer peer, final TransferDirection 
direction) throws IOException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been 
performed");
+        }
+        if ( !readyForFileTransfer ) {
+            throw new IllegalStateException("Cannot start transaction; 
handshake resolution was " + handshakeResponse);
+        }
+        
+        transaction = new SocketClientTransaction(peer, direction, 
useCompression);
+
+        final DataOutputStream dos = transaction.getDataOutputStream();
+        if ( direction == TransferDirection.RECEIVE ) {
+            // Indicate that we would like to have some data
+            RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        } else {
+            // Indicate that we would like to have some data
+            RequestType.SEND_FLOWFILES.writeRequestType(dos);
+            dos.flush();
+        }
+    }
+    
+    @Override
+    public DataPacket receiveData(final FlowFileCodec codec) throws 
IOException, ProtocolException {
+       if ( transaction == null ) {
+               throw new IllegalStateException("Cannot receive data because no 
transaction has been started");
+       }
+       
+       final Peer peer = transaction.getPeer();
+        logger.debug("{} Receiving FlowFiles from {}", this, peer);
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        String userDn = commsSession.getUserDn();
+        if ( userDn == null ) {
+            userDn = "none";
+        }
+        
+        // Determine if Peer will send us data or has no data to send us
+        final Response dataAvailableCode = Response.read(dis);
+        switch (dataAvailableCode.getCode()) {
+            case MORE_DATA:
+                logger.debug("{} {} Indicates that data is available", this, 
peer);
+                break;
+            case NO_MORE_DATA:
+                logger.debug("{} No data available from {}", peer);
+                return null;
+            default:
+                throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
+        }
+        
+        
+    }
+    
     
     @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
@@ -258,6 +324,7 @@ public class SocketClientProtocol implements ClientProtocol 
{
                 logger.debug("{} {} Indicates that data is available", this, 
peer);
                 break;
             case NO_MORE_DATA:
+               context.yield();
                 logger.debug("{} No data available from {}", peer);
                 return;
             default:

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..0c4ce05
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.Transaction;
+import org.apache.nifi.remote.io.CompressionInputStream;
+
+public class SocketClientTransaction implements Transaction {
+       private final long startTime = System.nanoTime();
+       private long bytesReceived = 0L;
+       private CRC32 crc = new CRC32();
+       
+       private final Peer peer;
+       private final TransferDirection direction;
+       
+       private final DataInputStream dis;
+       private final DataOutputStream dos;
+       private final CheckedInputStream checkedInputStream;
+       
+       SocketClientTransaction(final Peer peer, final TransferDirection 
direction, final boolean useCompression) throws IOException {
+               this.peer = peer;
+               this.direction = direction;
+               
+               this.dis = new 
DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+               this.dos = new 
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+               
+               final InputStream dataInputStream = useCompression ? new 
CompressionInputStream(dis) : dis;
+        checkedInputStream = new CheckedInputStream(dataInputStream, crc);
+       }
+       
+       CheckedInputStream getCheckedInputStream() {
+               return checkedInputStream;
+       }
+       
+       DataOutputStream getDataOutputStream() {
+               return dos;
+       }
+       
+       Peer getPeer() {
+               return peer;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+    public static final String CONTROLLER_URI_PATH = "/controller";
+
+    private static final int CONNECT_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    
+    private final Client client;
+    
+    public RemoteNiFiUtils(final SSLContext sslContext) {
+        this.client = getClient(sslContext);
+    }
+    
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
+        return get(uri, timeoutMillis, null);
+    }
+    
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @param queryParams 
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis, final 
Map<String, String> queryParams) throws ClientHandlerException, 
UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if ( queryParams != null ) {
+            for ( final Map.Entry<String, String> queryEntry : 
queryParams.entrySet() ) {
+                webResource = webResource.queryParam(queryEntry.getKey(), 
queryEntry.getValue());
+            }
+        }
+
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
+
+        return 
webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse head(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
+        return webResource.head();
+    }
+
+    /**
+     * Gets a client based on the specified URI.
+     * 
+     * @param uri
+     * @return 
+     */
+    private Client getClient(final SSLContext sslContext) {
+        final Client client;
+        if (sslContext == null) {
+            client = WebUtils.createClient(null);
+        } else {
+            client = WebUtils.createClient(null, sslContext);
+        }
+
+        client.setReadTimeout(READ_TIMEOUT);
+        client.setConnectTimeout(CONNECT_TIMEOUT);
+
+        return client;
+    }
+    
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the base URI of the remote instance. This should include the 
path only to the nifi-api level, as well as the protocol, host, and port.
+     * @param timeoutMillis
+     * @return
+     * @throws IOException
+     */
+    public Integer getRemoteListeningPort(final String uri, final int 
timeoutMillis) throws IOException {
+       try {
+                       final URI uriObject = new URI(uri + 
CONTROLLER_URI_PATH);
+                       return getRemoteListeningPort(uriObject, timeoutMillis);
+               } catch (URISyntaxException e) {
+                       throw new IOException("Unable to establish connection 
to remote host because URI is invalid: " + uri);
+               }
+    }
+    
+    public String getRemoteRootGroupId(final String uri, final int 
timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getRemoteRootGroupId(uriObject, timeoutMillis);
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
+        }
+    }
+    
+    public String getRemoteInstanceId(final String uri, final int 
timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getController(uriObject, timeoutMillis).getInstanceId();
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
+        }
+    }
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the full URI to fetch, including the path.
+     * @return
+     * @throws IOException
+     */
+    private Integer getRemoteListeningPort(final URI uri, final int 
timeoutMillis) throws IOException {
+       return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+    }
+    
+    private String getRemoteRootGroupId(final URI uri, final int 
timeoutMillis) throws IOException {
+        return getController(uri, timeoutMillis).getId();
+    }
+    
+    public ControllerDTO getController(final URI uri, final int timeoutMillis) 
throws IOException {
+        final ClientResponse response = get(uri, timeoutMillis);
+        
+        if (Status.OK.getStatusCode() == 
response.getStatusInfo().getStatusCode()) {
+            final ControllerEntity entity = 
response.getEntity(ControllerEntity.class);
+            return entity.getController();
+        } else {
+            final String responseMessage = response.getEntity(String.class);
+            throw new IOException("Got HTTP response Code " + 
response.getStatusInfo().getStatusCode() + ": " + 
response.getStatusInfo().getReasonPhrase() + " with explanation: " + 
responseMessage);
+        }
+    }
+    
+    /**
+     * Issues a registration request on behalf of the current user.
+     * 
+     * @param baseApiUri 
+     * @return  
+     */
+    public ClientResponse issueRegistrationRequest(String baseApiUri) {
+        final URI uri = URI.create(String.format("%s/%s", baseApiUri, 
"/controller/users"));
+
+        // set up the query params
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        entity.add("justification", "A Remote instance of NiFi has attempted 
to create a reference to this NiFi. This action must be approved first.");
+        
+        // create the web resource
+        WebResource webResource = client.resource(uri);
+        
+        // get the client utils and make the request
+        return 
webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index 9f2dac8..ac41cba 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.groups;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Date;
 import java.util.Set;
@@ -109,15 +108,6 @@ public interface RemoteProcessGroup {
     String getCommunicationsTimeout();
 
     /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    Integer getListeningPort() throws IOException;
-
-    /**
      * Indicates whether or not the RemoteProcessGroup is currently scheduled 
to
      * transmit data
      *
@@ -229,24 +219,4 @@ public interface RemoteProcessGroup {
     void verifyCanStopTransmitting();
 
     void verifyCanUpdate();
-
-    /**
-     * Returns a set of PeerStatus objects that describe the different peers
-     * that we can communicate with for this RemoteProcessGroup.
-     *
-     * If the destination is a cluster, this set will contain PeerStatuses for
-     * each of the nodes in the cluster.
-     *
-     * If the destination is a standalone instance, this set will contain just 
a
-     * PeerStatus for the destination.
-     *
-     * Once the PeerStatuses have been obtained, they may be cached by this
-     * RemoteProcessGroup for some amount of time.
-     *
-     * If unable to obtain the PeerStatuses or no peer status has yet been
-     * obtained, will return null.
-     *
-     * @return
-     */
-    Set<PeerStatus> getPeerStatuses();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 857add9..db0aeb7 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -19,7 +19,6 @@ package org.apache.nifi.remote;
 import static java.util.Objects.requireNonNull;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -50,7 +49,6 @@ import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.util.RemoteProcessGroupUtils;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
@@ -59,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
@@ -85,7 +84,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     public static final String CONTROLLER_URI_PATH = "/controller";
     public static final String ROOT_GROUP_STATUS_URI_PATH = 
"/controller/process-groups/root/status";
-    public static final long LISTENING_PORT_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
 
     // status codes
     public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
@@ -127,9 +125,8 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 
0, 0, 0);
     private Long refreshContentsTimestamp = null;
-    private Integer listeningPort;
-    private long listeningPortRetrievalTime = 0L;
     private Boolean destinationSecure;
+    private Integer listeningPort;
 
     private volatile String authorizationIssue;
 
@@ -175,48 +172,13 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             }
         };
 
-        endpointConnectionPool = new EndpointConnectionStatePool(sslContext, 
eventReporter, getPeerPersistenceFile());
+        endpointConnectionPool = new 
EndpointConnectionStatePool(getTargetUri().toString(), 
getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
+                       sslContext, eventReporter, getPeerPersistenceFile());
         
-        final Runnable socketCleanup = new Runnable() {
-            @Override
-            public void run() {
-                final Set<StandardRemoteGroupPort> ports = new HashSet<>();
-                readLock.lock();
-                try {
-                    ports.addAll(inputPorts.values());
-                    ports.addAll(outputPorts.values());
-                } finally {
-                    readLock.unlock();
-                }
-
-                endpointConnectionPool.cleanupExpiredSockets();
-            }
-        };
-
-        final Runnable refreshPeers = new Runnable() {
-            @Override
-            public void run() {
-               final boolean secure;
-               try {
-                       secure = isSecure();
-                               } catch (CommunicationsException e) {
-                                       logger.warn("{} Unable to determine if 
remote instance {} is configured for secure site-to-site due to {}; will not 
refresh list of peers", new Object[] {this, getTargetUri(), e.toString()});
-                                       if ( logger.isDebugEnabled() ) {
-                                               logger.warn("", e);
-                                       }
-                                       return;
-                               }
-               
-               endpointConnectionPool.refreshPeers(getTargetUri(), secure);
-            }
-        };
-
         final Runnable checkAuthorizations = new InitializationTask();
 
         backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + 
id + ": " + targetUri);
         backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 
0L, 30L, TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(refreshPeers, 0, 5, 
TimeUnit.SECONDS);
-        backgroundThreadExecutor.scheduleWithFixedDelay(socketCleanup, 10L, 
10L, TimeUnit.SECONDS);
     }
 
     @Override
@@ -810,7 +772,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             return;
         }
 
-        final RemoteProcessGroupUtils utils = new 
RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+        final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? 
sslContext : null);
         final String uriVal = apiUri.toString() + CONTROLLER_URI_PATH;
         URI uri;
         try {
@@ -950,39 +912,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         return descriptor;
     }
 
-    /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    @Override
-    public Integer getListeningPort() throws IOException {
-        Integer listeningPort;
-        readLock.lock();
-        try {
-            listeningPort = this.listeningPort;
-            if (listeningPort != null && this.listeningPortRetrievalTime > 
System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
-                return listeningPort;
-            }
-        } finally {
-            readLock.unlock();
-        }
-
-        final RemoteProcessGroupUtils utils = new 
RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
-        listeningPort = utils.getRemoteListeningPort(apiUri.toString(), 
getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-        writeLock.lock();
-        try {
-            this.listeningPort = listeningPort;
-            this.listeningPortRetrievalTime = System.currentTimeMillis();
-        } finally {
-            writeLock.unlock();
-        }
-
-        return listeningPort;
-    }
-
     @Override
     public boolean isTransmitting() {
         return transmitting.get();
@@ -1218,7 +1147,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         @Override
         public void run() {
             try {
-                final RemoteProcessGroupUtils utils = new 
RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+                final RemoteNiFiUtils utils = new 
RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
                 final ClientResponse response = utils.get(new URI(apiUri + 
CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
                 
                 final int statusCode = response.getStatus();
@@ -1398,12 +1327,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         }
     }
 
-    @Override
-    public Set<PeerStatus> getPeerStatuses() {
-       return endpointConnectionPool.getPeerStatuses();
-    }
-
-
     private File getPeerPersistenceFile() {
         final File stateDir = 
NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, getIdentifier() + ".peers");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
deleted file mode 100644
index 10208f8..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/controller/util/RemoteProcessGroupUtils.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.util;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-import javax.net.ssl.SSLContext;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.entity.ControllerEntity;
-import org.apache.nifi.web.util.WebUtils;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.UniformInterfaceException;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
-/**
- *
- */
-public class RemoteProcessGroupUtils {
-
-    public static final String CONTROLLER_URI_PATH = "/controller";
-
-    private static final int CONNECT_TIMEOUT = 10000;
-    private static final int READ_TIMEOUT = 10000;
-    
-    private final Client client;
-    
-    public RemoteProcessGroupUtils(final SSLContext sslContext) {
-        this.client = getClient(sslContext);
-    }
-    
-
-    /**
-     * Gets the content at the specified URI.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse get(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
-        return get(uri, timeoutMillis, null);
-    }
-    
-    /**
-     * Gets the content at the specified URI using the given query parameters.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @param queryParams 
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse get(final URI uri, final int timeoutMillis, final 
Map<String, String> queryParams) throws ClientHandlerException, 
UniformInterfaceException {
-        // perform the request
-        WebResource webResource = client.resource(uri);
-        if ( queryParams != null ) {
-            for ( final Map.Entry<String, String> queryEntry : 
queryParams.entrySet() ) {
-                webResource = webResource.queryParam(queryEntry.getKey(), 
queryEntry.getValue());
-            }
-        }
-
-        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
-        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
-
-        return 
webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
-    }
-
-    /**
-     * Performs a HEAD request to the specified URI.
-     *
-     * @param uri
-     * @param timeoutMillis
-     * @return
-     * @throws ClientHandlerException
-     * @throws UniformInterfaceException
-     */
-    public ClientResponse head(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
-        // perform the request
-        WebResource webResource = client.resource(uri);
-        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
-        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
-        return webResource.head();
-    }
-
-    /**
-     * Gets a client based on the specified URI.
-     * 
-     * @param uri
-     * @return 
-     */
-    private Client getClient(final SSLContext sslContext) {
-        final Client client;
-        if (sslContext == null) {
-            client = WebUtils.createClient(null);
-        } else {
-            client = WebUtils.createClient(null, sslContext);
-        }
-
-        client.setReadTimeout(READ_TIMEOUT);
-        client.setConnectTimeout(CONNECT_TIMEOUT);
-
-        return client;
-    }
-    
-    
-    /**
-     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
-     * is not configured to use Site-to-Site transfers.
-     * 
-     * @param uri the base URI of the remote instance. This should include the 
path only to the nifi-api level, as well as the protocol, host, and port.
-     * @param timeoutMillis
-     * @return
-     * @throws IOException
-     */
-    public Integer getRemoteListeningPort(final String uri, final int 
timeoutMillis) throws IOException {
-       try {
-                       final URI uriObject = new URI(uri + 
CONTROLLER_URI_PATH);
-                       return getRemoteListeningPort(uriObject, timeoutMillis);
-               } catch (URISyntaxException e) {
-                       throw new IOException("Unable to establish connection 
to remote host because URI is invalid: " + uri);
-               }
-    }
-    
-    public String getRemoteRootGroupId(final String uri, final int 
timeoutMillis) throws IOException {
-        try {
-            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
-            return getRemoteRootGroupId(uriObject, timeoutMillis);
-        } catch (URISyntaxException e) {
-            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
-        }
-    }
-    
-    public String getRemoteInstanceId(final String uri, final int 
timeoutMillis) throws IOException {
-        try {
-            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
-            return getController(uriObject, timeoutMillis).getInstanceId();
-        } catch (URISyntaxException e) {
-            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
-        }
-    }
-    
-    /**
-     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
-     * is not configured to use Site-to-Site transfers.
-     * 
-     * @param uri the full URI to fetch, including the path.
-     * @return
-     * @throws IOException
-     */
-    private Integer getRemoteListeningPort(final URI uri, final int 
timeoutMillis) throws IOException {
-       return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
-    }
-    
-    private String getRemoteRootGroupId(final URI uri, final int 
timeoutMillis) throws IOException {
-        return getController(uri, timeoutMillis).getId();
-    }
-    
-    private ControllerDTO getController(final URI uri, final int 
timeoutMillis) throws IOException {
-        final ClientResponse response = get(uri, timeoutMillis);
-        
-        if (Status.OK.getStatusCode() == 
response.getStatusInfo().getStatusCode()) {
-            final ControllerEntity entity = 
response.getEntity(ControllerEntity.class);
-            return entity.getController();
-        } else {
-            final String responseMessage = response.getEntity(String.class);
-            throw new IOException("Got HTTP response Code " + 
response.getStatusInfo().getStatusCode() + ": " + 
response.getStatusInfo().getReasonPhrase() + " with explanation: " + 
responseMessage);
-        }
-    }
-    
-    /**
-     * Issues a registration request on behalf of the current user.
-     * 
-     * @param baseApiUri 
-     * @return  
-     */
-    public ClientResponse issueRegistrationRequest(String baseApiUri) {
-        final URI uri = URI.create(String.format("%s/%s", baseApiUri, 
"/controller/users"));
-
-        // set up the query params
-        MultivaluedMapImpl entity = new MultivaluedMapImpl();
-        entity.add("justification", "A Remote instance of NiFi has attempted 
to create a reference to this NiFi. This action must be approved first.");
-        
-        // create the web resource
-        WebResource webResource = client.resource(uri);
-        
-        // get the client utils and make the request
-        return 
webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 922d4e7..2b27de2 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -56,6 +56,14 @@ public class RemoteResourceFactory extends 
RemoteResourceInitiator {
         }
        }
        
+       public static void rejectCodecNegotiation(final DataInputStream dis, 
final DataOutputStream dos, final String explanation) throws IOException {
+               dis.readUTF();  // read codec name
+               dis.readInt();  // read codec version
+               
+               dos.write(ABORT);
+               dos.writeUTF(explanation);
+               dos.flush();
+       }
        
        @SuppressWarnings("unchecked")
     public static <T extends ClientProtocol> T 
receiveClientProtocolNegotiation(final DataInputStream dis, final 
DataOutputStream dos) throws IOException, HandshakeException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 77ac1a9..82d8206 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.remote;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -33,7 +32,6 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -144,7 +142,7 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
         
         final EndpointConnectionState connectionState;
         try {
-               connectionState = 
connectionStatePool.getEndpointConnectionState(url, this, transferDirection);
+               connectionState = 
connectionStatePool.getEndpointConnectionState(this, transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
@@ -366,28 +364,4 @@ public class StandardRemoteGroupPort extends 
RemoteGroupPort {
     public boolean isSideEffectFree() {
         return false;
     }
-
-       @Override
-       public String getDescription() {
-               return toString();
-       }
-
-       @Override
-       public long getCommunicationsTimeout(final TimeUnit timeUnit) {
-               return 
getRemoteProcessGroup().getCommunicationsTimeout(timeUnit);
-       }
-
-       @Override
-       public URI getTargetUri() {
-               return remoteGroup.getTargetUri();
-       }
-       
-       @Override
-       public boolean isSecure() {
-               try {
-                       return remoteGroup.isSecure();
-               } catch (final CommunicationsException ce) {
-                       return false;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 647b45c..887429c 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -204,11 +204,6 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             ResponseCode.MISSING_PROPERTY.writeResponse(dos, 
HandshakeProperty.GZIP.name());
             throw new HandshakeException("Missing Property " + 
HandshakeProperty.GZIP.name());
         }
-        if ( port == null ) {
-            logger.debug("Responding with ResponseCode MISSING_PROPERTY 
because Port Identifier property is missing");
-            ResponseCode.MISSING_PROPERTY.writeResponse(dos, 
HandshakeProperty.PORT_IDENTIFIER.name());
-            throw new HandshakeException("Missing Property " + 
HandshakeProperty.PORT_IDENTIFIER.name());
-        }
         
         // send "OK" response
         if ( !responseWritten ) {
@@ -243,6 +238,10 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
         
+        if ( port == null ) {
+               RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot 
transfer FlowFiles because no port was specified");
+        }
+        
         // Negotiate the FlowFileCodec to use.
         try {
             negotiatedFlowFileCodec = 
RemoteResourceFactory.receiveCodecNegotiation(dis, dos);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c174d3a6/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java 
b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 94de86b..8c972f7 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -16,21 +16,11 @@
  */
 package org.apache.nifi.remote;
 
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 public interface RemoteDestination {
-
-       String getDescription();
-       
        String getIdentifier();
        
-       URI getTargetUri();
-       
-       boolean isSecure();
-       
-       long getCommunicationsTimeout(TimeUnit timeUnit);
-       
        long getYieldPeriod(TimeUnit timeUnit);
        
        boolean isUseCompression();

Reply via email to