http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
 
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
new file mode 100644
index 0000000..656573d
--- /dev/null
+++ 
b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * A factory for creating SSL contexts using the application's security
+ * properties.
+ *
+ */
+public final class SslContextFactory {
+
+    public static enum ClientAuth {
+
+        WANT,
+        REQUIRED,
+        NONE
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param keystore the full path to the keystore
+     * @param keystorePasswd the keystore password
+     * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+     * @param truststore the full path to the truststore
+     * @param truststorePasswd the truststore password
+     * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+     * @param clientAuth the type of client authentication
+     * @param protocol         the protocol to use for the SSL connection
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException if any issues accessing the 
keystore
+     * @throws java.io.IOException for any problems loading the keystores
+     * @throws java.security.NoSuchAlgorithmException if an algorithm is found 
to be used but is unknown
+     * @throws java.security.cert.CertificateException if there is an issue 
with the certificate
+     * @throws java.security.UnrecoverableKeyException if the key is 
insufficient
+     * @throws java.security.KeyManagementException if unable to manage the key
+     */
+    public static SSLContext createSslContext(
+            final String keystore, final char[] keystorePasswd, final String 
keystoreType,
+            final String truststore, final char[] truststorePasswd, final 
String truststoreType,
+            final ClientAuth clientAuth, final String protocol)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, 
CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        try (final InputStream keyStoreStream = new FileInputStream(keystore)) 
{
+            keyStore.load(keyStoreStream, keystorePasswd);
+        }
+        final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePasswd);
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        try (final InputStream trustStoreStream = new 
FileInputStream(truststore)) {
+            trustStore.load(trustStoreStream, truststorePasswd);
+        }
+        final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        // initialize the ssl context
+        final SSLContext sslContext = SSLContext.getInstance(protocol);
+        sslContext.init(keyManagerFactory.getKeyManagers(), 
trustManagerFactory.getTrustManagers(), new SecureRandom());
+        if (ClientAuth.REQUIRED == clientAuth) {
+            sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+        } else if (ClientAuth.WANT == clientAuth) {
+            sslContext.getDefaultSSLParameters().setWantClientAuth(true);
+        } else {
+            sslContext.getDefaultSSLParameters().setWantClientAuth(false);
+        }
+
+        return sslContext;
+
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param keystore the full path to the keystore
+     * @param keystorePasswd the keystore password
+     * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+     * @param protocol the protocol to use for the SSL connection
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException if any issues accessing the 
keystore
+     * @throws java.io.IOException for any problems loading the keystores
+     * @throws java.security.NoSuchAlgorithmException if an algorithm is found 
to be used but is unknown
+     * @throws java.security.cert.CertificateException if there is an issue 
with the certificate
+     * @throws java.security.UnrecoverableKeyException if the key is 
insufficient
+     * @throws java.security.KeyManagementException if unable to manage the key
+     */
+    public static SSLContext createSslContext(
+            final String keystore, final char[] keystorePasswd, final String 
keystoreType, final String protocol)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, 
CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        try (final InputStream keyStoreStream = new FileInputStream(keystore)) 
{
+            keyStore.load(keyStoreStream, keystorePasswd);
+        }
+        final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePasswd);
+
+        // initialize the ssl context
+        final SSLContext ctx = SSLContext.getInstance(protocol);
+        ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new 
SecureRandom());
+
+        return ctx;
+
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param truststore the full path to the truststore
+     * @param truststorePasswd the truststore password
+     * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+     * @param protocol the protocol to use for the SSL connection
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException if any issues accessing the 
keystore
+     * @throws java.io.IOException for any problems loading the keystores
+     * @throws java.security.NoSuchAlgorithmException if an algorithm is found 
to be used but is unknown
+     * @throws java.security.cert.CertificateException if there is an issue 
with the certificate
+     * @throws java.security.UnrecoverableKeyException if the key is 
insufficient
+     * @throws java.security.KeyManagementException if unable to manage the key
+     */
+    public static SSLContext createTrustSslContext(
+            final String truststore, final char[] truststorePasswd, final 
String truststoreType, final String protocol)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, 
CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        try (final InputStream trustStoreStream = new 
FileInputStream(truststore)) {
+            trustStore.load(trustStoreStream, truststorePasswd);
+        }
+        final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        // initialize the ssl context
+        final SSLContext ctx = SSLContext.getInstance(protocol);
+        ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), 
new SecureRandom());
+
+        return ctx;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml 
b/nifi-commons/nifi-site-to-site-client/pom.xml
new file mode 100644
index 0000000..01b3aec
--- /dev/null
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+       Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-site-to-site-client</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+            <version>1.9.13</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-client-dto</artifactId>
+            <version>0.3.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
new file mode 100644
index 0000000..dacfd64
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public abstract class AbstractCommunicationsSession implements 
CommunicationsSession {
+
+    private String userDn;
+
+    private volatile String uri;
+
+    public AbstractCommunicationsSession(final String uri) {
+        this.uri = uri;
+    }
+
+    @Override
+    public String toString() {
+        return uri;
+    }
+
+    @Override
+    public void setUri(final String uri) {
+        this.uri = uri;
+    }
+
+    @Override
+    public String getUri() {
+        return uri;
+    }
+
+    @Override
+    public String getUserDn() {
+        return userDn;
+    }
+
+    @Override
+    public void setUserDn(final String dn) {
+        this.userDn = dn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
new file mode 100644
index 0000000..17b990e
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ * Represents the remote entity that the client is communicating with
+ */
+public interface Communicant {
+
+    /**
+     * @return the NiFi site-to-site URL for the remote NiFi instance
+     */
+    String getUrl();
+
+    /**
+     * @return The Host of the remote NiFi instance
+     */
+    String getHost();
+
+    /**
+     * @return The Port that the remote NiFi instance is listening on for
+     * site-to-site communications
+     */
+    int getPort();
+
+    /**
+     * @return The distinguished name that the remote NiFi instance has 
provided
+     * in its certificate if using secure communications, or <code>null</code>
+     * if the Distinguished Name is unknown
+     */
+    String getDistinguishedName();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
new file mode 100644
index 0000000..5cb37b0
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+
+public class Peer implements Communicant {
+
+    private final PeerDescription description;
+    private final CommunicationsSession commsSession;
+    private final String url;
+    private final String clusterUrl;
+    private final String host;
+    private final int port;
+
+    private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
+    private boolean closed = false;
+
+    public Peer(final PeerDescription description, final CommunicationsSession 
commsSession, final String peerUrl, final String clusterUrl) {
+        this.description = description;
+        this.commsSession = commsSession;
+        this.url = peerUrl;
+        this.clusterUrl = clusterUrl;
+
+        try {
+            final URI uri = new URI(peerUrl);
+            this.port = uri.getPort();
+            this.host = uri.getHost();
+        } catch (final Exception e) {
+            throw new IllegalArgumentException("Invalid URL: " + peerUrl);
+        }
+    }
+
+    public PeerDescription getDescription() {
+        return description;
+    }
+
+    @Override
+    public String getUrl() {
+        return url;
+    }
+
+    public String getClusterUrl() {
+        return clusterUrl;
+    }
+
+    public CommunicationsSession getCommunicationsSession() {
+        return commsSession;
+    }
+
+    public void close() throws IOException {
+        this.closed = true;
+
+        // Consume the InputStream so that it doesn't linger on the Peer's 
outgoing socket buffer
+        try {
+            commsSession.getInput().consume();
+        } finally {
+            commsSession.close();
+        }
+    }
+
+    /**
+     * Penalizes this peer for the given destination only for the provided
+     * number of milliseconds
+     *
+     * @param destinationId id of destination
+     * @param millis period of time to penalize peer
+     */
+    public void penalize(final String destinationId, final long millis) {
+        final Long currentPenalty = penaltyExpirationMap.get(destinationId);
+        final long proposedPenalty = System.currentTimeMillis() + millis;
+        if (currentPenalty == null || proposedPenalty > currentPenalty) {
+            penaltyExpirationMap.put(destinationId, proposedPenalty);
+        }
+    }
+
+    public boolean isPenalized(final String destinationId) {
+        final Long currentPenalty = penaltyExpirationMap.get(destinationId);
+        return (currentPenalty != null && currentPenalty > 
System.currentTimeMillis());
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public String getHost() {
+        return host;
+    }
+
+    @Override
+    public int hashCode() {
+        return 8320 + url.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof Peer)) {
+            return false;
+        }
+
+        final Peer other = (Peer) obj;
+        return this.url.equals(other.url);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("Peer[url=").append(url);
+        if (closed) {
+            sb.append(",CLOSED");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public String getDistinguishedName() {
+        return commsSession.getUserDn();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
new file mode 100644
index 0000000..6fc90e4
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerDescription.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerDescription {
+
+    private final String hostname;
+    private final int port;
+    private final boolean secure;
+
+    public PeerDescription(final String hostname, final int port, final 
boolean secure) {
+        this.hostname = hostname;
+        this.port = port;
+        this.secure = secure;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public boolean isSecure() {
+        return secure;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerDescription[hostname=" + hostname + ", port=" + port + ", 
secure=" + secure + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((hostname == null) ? 0 : 
hostname.hashCode());
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        final PeerDescription other = (PeerDescription) obj;
+        if (hostname == null) {
+            if (other.hostname != null) {
+                return false;
+            }
+        } else if (!hostname.equals(other.hostname)) {
+            return false;
+        }
+
+        return port == other.port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
new file mode 100644
index 0000000..6c8a4ec
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public class PeerStatus {
+
+    private final PeerDescription description;
+    private final int numFlowFiles;
+
+    public PeerStatus(final PeerDescription description, final int 
numFlowFiles) {
+        this.description = description;
+        this.numFlowFiles = numFlowFiles;
+    }
+
+    public PeerDescription getPeerDescription() {
+        return description;
+    }
+
+    public int getFlowFileCount() {
+        return numFlowFiles;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerStatus[hostname=" + description.getHostname() + ",port=" + 
description.getPort()
+                + ",secure=" + description.isSecure() + ",flowFileCount=" + 
numFlowFiles + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return 9824372 + description.getHostname().hashCode() + 
description.getPort() * 41;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof PeerStatus)) {
+            return false;
+        }
+
+        final PeerStatus other = (PeerStatus) obj;
+        return description.equals(other.getPeerDescription());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
new file mode 100644
index 0000000..582916e
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteResourceInitiator {
+
+    public static final int RESOURCE_OK = 20;
+    public static final int DIFFERENT_RESOURCE_VERSION = 21;
+    public static final int ABORT = 255;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteResourceInitiator.class);
+
+    public static VersionedRemoteResource initiateResourceNegotiation(final 
VersionedRemoteResource resource, final DataInputStream dis, final 
DataOutputStream dos)
+            throws IOException, HandshakeException {
+        // Write the classname of the RemoteStreamCodec, followed by its 
version
+        logger.debug("Negotiating resource; proposal is {}", resource);
+        dos.writeUTF(resource.getResourceName());
+        final VersionNegotiator negotiator = resource.getVersionNegotiator();
+        dos.writeInt(negotiator.getVersion());
+        dos.flush();
+
+        // wait for response from server.
+        logger.debug("Receiving response from remote instance");
+        final int statusCode = dis.read();
+        switch (statusCode) {
+            case RESOURCE_OK: // server accepted our proposal of codec 
name/version
+                logger.debug("Response was RESOURCE_OK");
+                return resource;
+            case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal 
of codec name but not the version
+                logger.debug("Response was DIFFERENT_RESOURCE_VERSION");
+                // Get server's preferred version
+                final int newVersion = dis.readInt();
+
+                // Determine our new preferred version that is no greater than 
the server's preferred version.
+                final Integer newPreference = 
negotiator.getPreferredVersion(newVersion);
+                // If we could not agree with server on a version, fail now.
+                if (newPreference == null) {
+                    throw new HandshakeException("Could not agree on version 
for " + resource);
+                }
+
+                negotiator.setVersion(newPreference);
+
+                // Attempt negotiation of resource based on our new preferred 
version.
+                return initiateResourceNegotiation(resource, dis, dos);
+            case ABORT:
+                logger.debug("Response was ABORT");
+                throw new HandshakeException("Remote destination aborted 
connection with message: " + dis.readUTF());
+            default:
+                logger.debug("Response was {}; unable to negotiate codec", 
statusCode);
+                return null; // Unable to negotiate codec
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..bfa5c82
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * <p>
+ * Provides a transaction for performing site-to-site data transfers.
+ * </p>
+ *
+ * <p>
+ * A Transaction is created by calling the
+ * {@link 
org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection)
 createTransaction(TransferDirection)}
+ * method of a
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The
+ * resulting Transaction can be used to either send or receive data but not
+ * both. A new Transaction must be created in order perform the other 
operation.
+ * </p>
+ *
+ * <p>
+ * The general flow of execute of a Transaction is as follows:
+ * <ol>
+ * <li>Create the transaction as described above.</li>
+ * <li>Send data via the {@link #send(DataPacket)} method or receive data via
+ * the {@link #receive()} method. This method will be called 1 or more times. 
In
+ * the case of receive, this method should be called until the method returns
+ * {@code null}, signifying that the remote instance is finished sending data.
+ * <b>Note:</b> <code>receive()</code> should not be called a second time
+ * without first fully consuming the stream from the previous Packet that was
+ * received.</li>
+ * <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ * <li>Either complete the transaction via the {@link #complete(boolean)} 
method
+ * or cancel the transaction via the {@link #cancel()} method.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * It is important that the Transaction be terminated in order to free the
+ * resources held by the Transaction. If a Transaction is not terminated, its
+ * resources will not be freed and if the Transaction holds connections from a
+ * connection pool, the connections in that pool will eventually become
+ * exhausted. A Transaction is terminated by calling one of the following
+ * methods:
+ * <ul>
+ * <li>{@link #complete(boolean)}</li>
+ * <li>{@link #cancel()}</li>
+ * <li>{@link #error()}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * If at any point an IOException is thrown from one of the methods of the
+ * Transaction, that Transaction is automatically closed via a call to
+ * {@link #error()}.
+ * </p>
+ *
+ * <p>
+ * The Transaction class should not be assumed to be thread-safe.
+ * </p>
+ */
+public interface Transaction {
+
+    /**
+     * Sends information to the remote NiFi instance.
+     *
+     * @param dataPacket the data packet to send
+     * @throws IOException if unable to send
+     */
+    void send(DataPacket dataPacket) throws IOException;
+
+    /**
+     * Sends the given byte array as the content of a {@link DataPacket} along
+     * with the provided attributes
+     *
+     * @param content to send
+     * @param attributes of the content
+     * @throws IOException if unable to send
+     */
+    void send(byte[] content, Map<String, String> attributes) throws 
IOException;
+
+    /**
+     * Retrieves information from the remote NiFi instance, if any is 
available.
+     * If no data is available, will return {@code null}. It is important to
+     * consume all data from the remote NiFi instance before attempting to call
+     * {@link #confirm()}. This is because the sender is always responsible for
+     * determining when the Transaction has finished. This is done in order to
+     * prevent the need for a round-trip network request to receive data for
+     * each data packet.
+     *
+     * @return the DataPacket received, or {@code null} if there is no more 
data
+     * to receive.
+     * @throws IOException if unable to receive
+     */
+    DataPacket receive() throws IOException;
+
+    /**
+     * <p>
+     * Confirms the data that was sent or received by comparing CRC32's of the
+     * data sent and the data received.
+     * </p>
+     *
+     * <p>
+     * Even if the protocol being used to send the data is reliable and
+     * guarantees ordering of packets (such as TCP), it is still required that
+     * we confirm the transaction before completing the transaction. This is
+     * done as "safety net" or a defensive programming technique. Mistakes
+     * happen, and this mechanism helps to ensure that if a bug exists 
somewhere
+     * along the line that we do not end up sending or receiving corrupt data.
+     * If the CRC32 of the sender and the CRC32 of the receiver do not match, 
an
+     * IOException will be thrown and both the sender and receiver will cancel
+     * the transaction automatically.
+     * </p>
+     *
+     * <p>
+     * If the {@link TransferDirection} of this Transaction is RECEIVE, this
+     * method will throw an Exception unless all data from the remote instance
+     * has been consumed (i.e., a call to {@link #receive()} returns
+     * {@code null}).
+     * </p>
+     *
+     * <p>
+     * If the {@link TransferDirection} of this Transaction is SEND, calling
+     * this method dictates that no more data will be sent in this transaction.
+     * I.e., there will be no more calls to {@link #send(DataPacket)}.
+     * </p>
+     *
+     * @throws IOException if unable to confirm transaction
+     */
+    void confirm() throws IOException;
+
+    /**
+     * <p>
+     * Completes the transaction and indicates to both the sender and receiver
+     * that the data transfer was successful.
+     * </p>
+     *
+     * @throws IOException if unable to complete
+     *
+     * @return a TransactionCompletion that contains details about the
+     * Transaction
+     */
+    TransactionCompletion complete() throws IOException;
+
+    /**
+     * <p>
+     * Cancels this transaction, indicating to the sender that the data has not
+     * been successfully received so that the sender can retry or handle 
however
+     * is appropriate.
+     * </p>
+     *
+     * @param explanation an explanation to tell the other party why the
+     * transaction was canceled.
+     * @throws IOException if unable to cancel
+     */
+    void cancel(final String explanation) throws IOException;
+
+    /**
+     * <p>
+     * Sets the TransactionState of the Transaction to
+     * {@link TransactionState#ERROR}, and closes the Transaction. The
+     * underlying connection should not be returned to a connection pool in 
this
+     * case.
+     * </p>
+     */
+    void error();
+
+    /**
+     * @return the current state of the Transaction.
+     * @throws IOException ioe
+     */
+    TransactionState getState() throws IOException;
+
+    /**
+     * @return a Communicant that represents the other side of this Transaction
+     * (i.e., the remote NiFi instance)
+     */
+    Communicant getCommunicant();
+
+    public enum TransactionState {
+
+        /**
+         * Transaction has been started but no data has been sent or received.
+         */
+        TRANSACTION_STARTED,
+        /**
+         * Transaction has been started and data has been sent or received.
+         */
+        DATA_EXCHANGED,
+        /**
+         * Data that has been transferred has been confirmed via its CRC.
+         * Transaction is ready to be completed.
+         */
+        TRANSACTION_CONFIRMED,
+        /**
+         * Transaction has been successfully completed.
+         */
+        TRANSACTION_COMPLETED,
+        /**
+         * The Transaction has been canceled.
+         */
+        TRANSACTION_CANCELED,
+        /**
+         * The Transaction ended in an error.
+         */
+        ERROR;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
new file mode 100644
index 0000000..1587e87
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * A TransactionCompletion provides information about a {@link Transaction} 
that
+ * has completed successfully.
+ */
+public interface TransactionCompletion {
+
+    /**
+     * When a sending to a NiFi instance, the server may accept the content 
sent
+     * to it but indicate that its queues are full and that the client should
+     * backoff sending data for a bit.
+     *
+     * @return <code>true</code> if the server did in fact request that,
+     * <code>false</code> otherwise
+     */
+    boolean isBackoff();
+
+    /**
+     * @return the number of Data Packets that were sent to or received from 
the
+     * remote NiFi instance in the Transaction
+     */
+    int getDataPacketsTransferred();
+
+    /**
+     * @return the number of bytes of DataPacket content that were sent to or
+     * received from the remote NiFI instance in the Transaction. Note that 
this
+     * is different than the number of bytes actually transferred between the
+     * client and server, as it does not take into account the attributes or
+     * protocol-specific information that is exchanged but rather takes into
+     * account only the data in the {@link InputStream} of the
+     * {@link DataPacket}
+     */
+    long getBytesTransferred();
+
+    /**
+     * @param timeUnit unit of time for which to report the duration
+     * @return the amount of time that the Transaction took, from the time that
+     * the Transaction was created to the time that the Transaction was
+     * completed
+     */
+    long getDuration(TimeUnit timeUnit);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
new file mode 100644
index 0000000..979ad9c
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ * An enumeration for specifying the direction in which data should be
+ * transferred between a client and a remote NiFi instance.
+ */
+public enum TransferDirection {
+
+    /**
+     * The client is to send data to the remote instance.
+     */
+    SEND,
+    /**
+     * The client is to receive data from the remote instance.
+     */
+    RECEIVE;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
new file mode 100644
index 0000000..bfccd98
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface VersionedRemoteResource {
+
+    VersionNegotiator getVersionNegotiator();
+
+    String getResourceName();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
new file mode 100644
index 0000000..63c3d63
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Serializable;
+
+public enum KeystoreType implements Serializable {
+    PKCS12,
+    JKS;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
new file mode 100644
index 0000000..78237b9
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -0,0 +1,771 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * <p>
+ * The SiteToSiteClient provides a mechanism for sending data to a remote
+ * instance of NiFi (or NiFi cluster) and retrieving data from a remote 
instance
+ * of NiFi (or NiFi cluster).
+ * </p>
+ *
+ * <p>
+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the
+ * Builder must be provided the URL of the remote NiFi instance. If the URL
+ * points to a standalone instance of NiFi, all interaction will take place 
with
+ * that instance of NiFi. However, if the URL points to the NiFi Cluster 
Manager
+ * of a cluster, the client will automatically handle load balancing the
+ * transactions across the different nodes in the cluster.
+ * </p>
+ *
+ * <p>
+ * The SiteToSiteClient provides a {@link Transaction} through which all
+ * interaction with the remote instance takes place. After data has been
+ * exchanged or it is determined that no data is available, the Transaction can
+ * then be canceled (via the {@link Transaction#cancel(String)} method) or can
+ * be completed (via the {@link Transaction#complete(boolean)} method).
+ * </p>
+ *
+ * <p>
+ * An instance of SiteToSiteClient can be obtained by constructing a new
+ * instance of the {@link SiteToSiteClient.Builder} class, calling the
+ * appropriate methods to configured the client as desired, and then calling 
the
+ * {@link SiteToSiteClient.Builder#build() build()} method.
+ * </p>
+ *
+ * <p>
+ * The SiteToSiteClient itself is immutable once constructed and is 
thread-safe.
+ * Many threads can share access to the same client. However, the
+ * {@link Transaction} that is created by the client is not thread safe and
+ * should not be shared among threads.
+ * </p>
+ */
+public interface SiteToSiteClient extends Closeable {
+
+    /**
+     * <p>
+     * Creates a new Transaction that can be used to either send data to a
+     * remote NiFi instance or receive data from a remote NiFi instance,
+     * depending on the value passed for the {@code direction} argument.
+     * </p>
+     *
+     * <p>
+     * <b>Note:</b> If all of the nodes are penalized (See
+     * {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then this 
method
+     * will return <code>null</code>.
+     * </p>
+     *
+     * @param direction specifies which direction the data should be
+     * transferred. A value of {@link TransferDirection#SEND} indicates that
+     * this Transaction will send data to the remote instance; a value of
+     * {@link TransferDirection#RECEIVE} indicates that this Transaction will 
be
+     * used to receive data from the remote instance.
+     *
+     * @return a Transaction to use for sending or receiving data, or
+     * <code>null</code> if all nodes are penalized.
+     * @throws org.apache.nifi.remote.exception.HandshakeException he
+     * @throws org.apache.nifi.remote.exception.PortNotRunningException pnre
+     * @throws IOException ioe
+     * @throws org.apache.nifi.remote.exception.UnknownPortException upe
+     */
+    Transaction createTransaction(TransferDirection direction) throws 
HandshakeException, PortNotRunningException, ProtocolException, 
UnknownPortException, IOException;
+
+    /**
+     * <p>
+     * In order to determine whether the server is configured for secure
+     * communications, the client may have to query the server's RESTful
+     * interface. Doing so could result in an IOException.
+     * </p>
+     *
+     * @return {@code true} if site-to-site communications with the remote
+     * instance are secure, {@code false} if site-to-site communications with
+     * the remote instance are not secure. Whether or not communications are
+     * secure depends on the server, not the client
+     * @throws IOException if unable to query the remote instance's RESTful
+     * interface or if the remote instance is not configured to allow
+     * site-to-site communications
+     */
+    boolean isSecure() throws IOException;
+
+    /**
+     *
+     * @return the configuration object that was built by the Builder
+     */
+    SiteToSiteClientConfig getConfig();
+
+    /**
+     * <p>
+     * The Builder is the mechanism by which all configuration is passed to the
+     * SiteToSiteClient. Once constructed, the SiteToSiteClient cannot be
+     * reconfigured (i.e., it is immutable). If a change in configuration 
should
+     * be desired, the client should be {@link Closeable#close() closed} and a
+     * new client created.
+     * </p>
+     */
+    public static class Builder implements Serializable {
+
+        private static final long serialVersionUID = -4954962284343090219L;
+
+        private String url;
+        private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
+        private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+        private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
+        private SSLContext sslContext;
+        private String keystoreFilename;
+        private String keystorePass;
+        private KeystoreType keystoreType;
+        private String truststoreFilename;
+        private String truststorePass;
+        private KeystoreType truststoreType;
+        private EventReporter eventReporter;
+        private File peerPersistenceFile;
+        private boolean useCompression;
+        private String portName;
+        private String portIdentifier;
+        private int batchCount;
+        private long batchSize;
+        private long batchNanos;
+
+        /**
+         * Populates the builder with values from the provided config
+         *
+         * @param config to start with
+         * @return the builder
+         */
+        public Builder fromConfig(final SiteToSiteClientConfig config) {
+            this.url = config.getUrl();
+            this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
+            this.penalizationNanos = 
config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+            this.idleExpirationNanos = 
config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
+            this.sslContext = config.getSslContext();
+            this.keystoreFilename = config.getKeystoreFilename();
+            this.keystorePass = config.getKeystorePassword();
+            this.keystoreType = config.getKeystoreType();
+            this.truststoreFilename = config.getTruststoreFilename();
+            this.truststorePass = config.getTruststorePassword();
+            this.truststoreType = config.getTruststoreType();
+            this.eventReporter = config.getEventReporter();
+            this.peerPersistenceFile = config.getPeerPersistenceFile();
+            this.useCompression = config.isUseCompression();
+            this.portName = config.getPortName();
+            this.portIdentifier = config.getPortIdentifier();
+            this.batchCount = config.getPreferredBatchCount();
+            this.batchSize = config.getPreferredBatchSize();
+            this.batchNanos = 
config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
+
+            return this;
+        }
+
+        /**
+         * Specifies the URL of the remote NiFi instance. If this URL points to
+         * the Cluster Manager of a NiFi cluster, data transfer to and from
+         * nodes will be automatically load balanced across the different 
nodes.
+         *
+         * @param url url of remote instance
+         * @return the builder
+         */
+        public Builder url(final String url) {
+            this.url = url;
+            return this;
+        }
+
+        /**
+         * Specifies the communications timeouts to use when interacting with
+         * the remote instances. The default value is 30 seconds.
+         *
+         * @param timeout to use when interacting with remote instances
+         * @param unit unit of time over which to interpret the given timeout
+         * @return the builder
+         */
+        public Builder timeout(final long timeout, final TimeUnit unit) {
+            this.timeoutNanos = unit.toNanos(timeout);
+            return this;
+        }
+
+        /**
+         * Specifies the amount of time that a connection can remain idle in 
the
+         * connection pool before it is "expired" and shutdown. The default
+         * value is 30 seconds.
+         *
+         * @param timeout to use when interacting with remote instances
+         * @param unit unit of time over which to interpret the given timeout
+         * @return the builder
+         */
+        public Builder idleExpiration(final long timeout, final TimeUnit unit) 
{
+            this.idleExpirationNanos = unit.toNanos(timeout);
+            return this;
+        }
+
+        /**
+         * If there is a problem communicating with a node (i.e., any node in
+         * the remote NiFi cluster or the remote instance of NiFi if it is
+         * standalone), specifies how long the client should wait before
+         * attempting to communicate with that node again. While a particular
+         * node is penalized, all other nodes in the remote cluster (if any)
+         * will still be available for communication. The default value is 3
+         * seconds.
+         *
+         * @param period time to wait between communication attempts
+         * @param unit over which to evaluate the given period
+         * @return the builder
+         */
+        public Builder nodePenalizationPeriod(final long period, final 
TimeUnit unit) {
+            this.penalizationNanos = unit.toNanos(period);
+            return this;
+        }
+
+        /**
+         * Specifies the SSL Context to use when communicating with the remote
+         * NiFi instance(s). If not specified, communications will not be
+         * secure. The remote instance of NiFi always determines whether or not
+         * Site-to-Site communications are secure (i.e., the client will always
+         * use secure or non-secure communications, depending on what the 
server
+         * dictates). <b>Note:</b> The SSLContext provided by this method will 
be
+         * ignored if using a Serializable Configuration (see {@link 
#buildSerializableConfig()}).
+         * If a Serializable Configuration is required and communications are 
to be
+         * secure, the {@link #keystoreFilename(String)}, {@link 
#keystorePass(String)},
+         * {@link #keystoreType}, {@link #truststoreFilename}, {@link 
#truststorePass(String)},
+         * and {@link #truststoreType(KeystoreType)} methods must be used 
instead.
+         *
+         * @param sslContext the context
+         * @return the builder
+         */
+        public Builder sslContext(final SSLContext sslContext) {
+            this.sslContext = sslContext;
+            return this;
+        }
+
+        /**
+         * @return the filename to use for the Keystore in order to 
communicate securely
+         * with the remote instance of NiFi
+         */
+        public String getKeystoreFilename() {
+            return keystoreFilename;
+        }
+
+        /**
+         * Sets the filename to use for the Keystore in order to communicate 
securely
+         * with the remote instance of NiFi
+         *
+         * @param keystoreFilename the filename to use for the Keystore in 
order to communicate securely
+         *            with the remote instance of NiFi
+         * @return the builder
+         */
+        public Builder keystoreFilename(final String keystoreFilename) {
+            this.keystoreFilename = keystoreFilename;
+            return this;
+        }
+
+        /**
+         * @return the password to use for the Keystore in order to 
communicate securely
+         * with the remote instance of NiFi
+         */
+        public String getKeystorePass() {
+            return keystorePass;
+        }
+
+        /**
+         * Sets the password to use for the Keystore in order to communicate 
securely
+         * with the remote instance of NiFi
+         *
+         * @param keystorePass the password to use for the Keystore in order 
to communicate securely
+         *            with the remote instance of NiFi
+         * @return the builder
+         */
+        public Builder keystorePass(final String keystorePass) {
+            this.keystorePass = keystorePass;
+            return this;
+        }
+
+        /**
+         * @return the type of Keystore to use in order to communicate securely
+         * with the remote instance of NiFi
+         */
+        public KeystoreType getKeystoreType() {
+            return keystoreType;
+        }
+
+        /**
+         * Sets the type of the Keystore to use in order to communicate 
securely
+         * with the remote instance of NiFi
+         *
+         * @param keystoreType the type of the Keystore to use in order to 
communicate securely
+         *            with the remote instance of NiFi
+         * @return the builder
+         */
+        public Builder keystoreType(final KeystoreType keystoreType) {
+            this.keystoreType = keystoreType;
+            return this;
+        }
+
+        /**
+         * @return the filename to use for the Truststore in order to 
communicate securely
+         * with the remote instance of NiFi
+         */
+        public String getTruststoreFilename() {
+            return truststoreFilename;
+        }
+
+        /**
+         * Sets the filename to use for the Truststore in order to communicate 
securely
+         * with the remote instance of NiFi
+         *
+         * @param truststoreFilename the filename to use for the Truststore in 
order to communicate securely
+         *            with the remote instance of NiFi
+         * @return the builder
+         */
+        public Builder truststoreFilename(final String truststoreFilename) {
+            this.truststoreFilename = truststoreFilename;
+            return this;
+        }
+
+        /**
+         * @return the password to use for the Truststore in order to 
communicate securely
+         * with the remote instance of NiFi
+         */
+        public String getTruststorePass() {
+            return truststorePass;
+        }
+
+        /**
+         * Sets the password to use for the Truststore in order to communicate 
securely
+         * with the remote instance of NiFi
+         *
+         * @param truststorePass the filename to use for the Truststore in 
order to communicate securely
+         * with the remote instance of NiFi
+         */
+        public Builder truststorePass(final String truststorePass) {
+            this.truststorePass = truststorePass;
+            return this;
+        }
+
+        /**
+         * @return the type of the Truststore to use in order to communicate 
securely
+         * with the remote instance of NiFi
+         */
+        public KeystoreType getTruststoreType() {
+            return truststoreType;
+        }
+
+        /**
+         * Sets the password type of the Truststore to use in order to 
communicate securely
+         * with the remote instance of NiFi
+         *
+         * @param truststoreType the type of the Truststore to use in order to 
communicate securely
+         *            with the remote instance of NiFi
+         * @return the builder
+         */
+        public Builder truststoreType(final KeystoreType truststoreType) {
+            this.truststoreType = truststoreType;
+            return this;
+        }
+
+        /**
+         * Provides an EventReporter that can be used by the client in order to
+         * report any events that could be of interest when communicating with
+         * the remote instance. The EventReporter provided must be threadsafe.
+         *
+         * @param eventReporter reporter
+         * @return the builder
+         */
+        public Builder eventReporter(final EventReporter eventReporter) {
+            this.eventReporter = eventReporter;
+            return this;
+        }
+
+        /**
+         * Specifies a file that the client can write to in order to persist 
the
+         * list of nodes in the remote cluster and recover the list of nodes
+         * upon restart. This allows the client to function if the remote
+         * Cluster Manager is unavailable, even after a restart of the client
+         * software. If not specified, the list of nodes will not be persisted
+         * and a failure of the Cluster Manager will result in not being able 
to
+         * communicate with the remote instance if a new client is created.
+         *
+         * @param peerPersistenceFile file
+         * @return the builder
+         */
+        public Builder peerPersistenceFile(final File peerPersistenceFile) {
+            this.peerPersistenceFile = peerPersistenceFile;
+            return this;
+        }
+
+        /**
+         * Specifies whether or not data should be compressed before being
+         * transferred to or from the remote instance.
+         *
+         * @param compress true if should compress
+         * @return the builder
+         */
+        public Builder useCompression(final boolean compress) {
+            this.useCompression = compress;
+            return this;
+        }
+
+        /**
+         * Specifies the name of the port to communicate with. Either the port
+         * name or the port identifier must be specified.
+         *
+         * @param portName name of port
+         * @return the builder
+         */
+        public Builder portName(final String portName) {
+            this.portName = portName;
+            return this;
+        }
+
+        /**
+         * Specifies the unique identifier of the port to communicate with. If
+         * it is known, this is preferred over providing the port name, as the
+         * port name may change.
+         *
+         * @param portIdentifier identifier of port
+         * @return the builder
+         */
+        public Builder portIdentifier(final String portIdentifier) {
+            this.portIdentifier = portIdentifier;
+            return this;
+        }
+
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how large
+         * a Transaction is. However, the client has the ability to request a
+         * particular batch size/duration. This method specifies the preferred
+         * number of {@link DataPacket}s to include in a Transaction.
+         *
+         * @param count client preferred batch size
+         * @return the builder
+         */
+        public Builder requestBatchCount(final int count) {
+            this.batchCount = count;
+            return this;
+        }
+
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how large
+         * a Transaction is. However, the client has the ability to request a
+         * particular batch size/duration. This method specifies the preferred
+         * number of bytes to include in a Transaction.
+         *
+         * @param bytes client preferred batch size
+         * @return the builder
+         */
+        public Builder requestBatchSize(final long bytes) {
+            this.batchSize = bytes;
+            return this;
+        }
+
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how large
+         * a Transaction is. However, the client has the ability to request a
+         * particular batch size/duration. This method specifies the preferred
+         * amount of time that a Transaction should span.
+         *
+         * @param value client preferred batch duration
+         * @param unit client preferred batch duration unit
+         * @return the builder
+         */
+        public Builder requestBatchDuration(final long value, final TimeUnit 
unit) {
+            this.batchNanos = unit.toNanos(value);
+            return this;
+        }
+
+        /**
+         * @return a {@link SiteToSiteClientConfig} for the configured values
+         * but does not create a SiteToSiteClient
+         */
+        public SiteToSiteClientConfig buildConfig() {
+            return new SiteToSiteClientConfig() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public boolean isUseCompression() {
+                    return Builder.this.isUseCompression();
+                }
+
+                @Override
+                public String getUrl() {
+                    return Builder.this.getUrl();
+                }
+
+                @Override
+                public long getTimeout(final TimeUnit timeUnit) {
+                    return Builder.this.getTimeout(timeUnit);
+                }
+
+                @Override
+                public long getIdleConnectionExpiration(final TimeUnit 
timeUnit) {
+                    return Builder.this.getIdleConnectionExpiration(timeUnit);
+                }
+
+                @Override
+                public SSLContext getSslContext() {
+                    return Builder.this.getSslContext();
+                }
+
+                @Override
+                public String getPortName() {
+                    return Builder.this.getPortName();
+                }
+
+                @Override
+                public String getPortIdentifier() {
+                    return Builder.this.getPortIdentifier();
+                }
+
+                @Override
+                public long getPenalizationPeriod(final TimeUnit timeUnit) {
+                    return Builder.this.getPenalizationPeriod(timeUnit);
+                }
+
+                @Override
+                public File getPeerPersistenceFile() {
+                    return Builder.this.getPeerPersistenceFile();
+                }
+
+                @Override
+                public EventReporter getEventReporter() {
+                    return Builder.this.getEventReporter();
+                }
+
+                @Override
+                public long getPreferredBatchDuration(final TimeUnit timeUnit) 
{
+                    return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS);
+                }
+
+                @Override
+                public long getPreferredBatchSize() {
+                    return batchSize;
+                }
+
+                @Override
+                public int getPreferredBatchCount() {
+                    return batchCount;
+                }
+
+                @Override
+                public String getKeystoreFilename() {
+                    return keystoreFilename;
+                }
+
+                @Override
+                public String getKeystorePassword() {
+                    return keystorePass;
+                }
+
+                @Override
+                public KeystoreType getKeystoreType() {
+                    return keystoreType;
+                }
+
+                @Override
+                public String getTruststoreFilename() {
+                    return truststoreFilename;
+                }
+
+                @Override
+                public String getTruststorePassword() {
+                    return truststorePass;
+                }
+
+                @Override
+                public KeystoreType getTruststoreType() {
+                    return truststoreType;
+                }
+            };
+        }
+
+        /**
+         * @return a new SiteToSiteClient that can be used to send and receive
+         *         data with remote instances of NiFi
+         *
+         * @throws IllegalStateException if either the url is not set or 
neither
+         *             the port name nor port identifier is set.
+         */
+        public SiteToSiteClient build() {
+            if (url == null) {
+                throw new IllegalStateException("Must specify URL to build 
Site-to-Site client");
+            }
+
+            if (portName == null && portIdentifier == null) {
+                throw new IllegalStateException("Must specify either Port Name 
or Port Identifier to build Site-to-Site client");
+            }
+
+            return new SocketClient(buildConfig());
+        }
+
+        /**
+         * @return the configured URL for the remote NiFi instance
+         */
+        public String getUrl() {
+            return url;
+        }
+
+        /**
+         * @param timeUnit unit over which to interpret the timeout
+         * @return the communications timeout
+         */
+        public long getTimeout(final TimeUnit timeUnit) {
+            return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
+        }
+
+        /**
+         * @param timeUnit unit over which to interpret the time
+         * @return the amount of of time that a connection can remain idle in
+         * the connection pool before being shutdown
+         */
+        public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+            return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
+        }
+
+        /**
+         * @param timeUnit unit of reported time
+         * @return the amount of time that a particular node will be ignored
+         * after a communications error with that node occurs
+         */
+        public long getPenalizationPeriod(TimeUnit timeUnit) {
+            return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+        }
+
+        /**
+         * @return the SSL Context that is configured for this builder
+         */
+        public SSLContext getSslContext() {
+            if (sslContext != null) {
+                return sslContext;
+            }
+
+            final KeyManagerFactory keyManagerFactory;
+            if (keystoreFilename != null && keystorePass != null && 
keystoreType != null) {
+                try {
+                    // prepare the keystore
+                    final KeyStore keyStore = 
KeyStore.getInstance(getKeystoreType().name());
+                    try (final InputStream keyStoreStream = new 
FileInputStream(new File(getKeystoreFilename()))) {
+                        keyStore.load(keyStoreStream, 
getKeystorePass().toCharArray());
+                    }
+                    keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+                    keyManagerFactory.init(keyStore, 
getKeystorePass().toCharArray());
+                } catch (final Exception e) {
+                    throw new RuntimeException("Failed to load Keystore", e);
+                }
+            } else {
+                keyManagerFactory = null;
+            }
+
+            final TrustManagerFactory trustManagerFactory;
+            if (truststoreFilename != null && truststorePass != null && 
truststoreType != null) {
+                try {
+                    // prepare the truststore
+                    final KeyStore trustStore = 
KeyStore.getInstance(getTruststoreType().name());
+                    try (final InputStream trustStoreStream = new 
FileInputStream(new File(getTruststoreFilename()))) {
+                        trustStore.load(trustStoreStream, 
getTruststorePass().toCharArray());
+                    }
+                    trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+                    trustManagerFactory.init(trustStore);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Failed to load Truststore", e);
+                }
+            } else {
+                trustManagerFactory = null;
+            }
+
+            if (keyManagerFactory != null || trustManagerFactory != null) {
+                try {
+                    // initialize the ssl context
+                    final SSLContext sslContext = 
SSLContext.getInstance("TLS");
+                    sslContext.init(keyManagerFactory.getKeyManagers(), 
trustManagerFactory.getTrustManagers(), new SecureRandom());
+                    
sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+
+                    return sslContext;
+                } catch (final Exception e) {
+                    throw new RuntimeException("Created keystore and 
truststore but failed to initialize SSLContext");
+                }
+            } else {
+                return null;
+            }
+        }
+
+        /**
+         * @return the EventReporter that is to be used by clients to report
+         * events
+         */
+        public EventReporter getEventReporter() {
+            return eventReporter;
+        }
+
+        /**
+         * @return the file that is to be used for persisting the nodes of a
+         * remote cluster, if any
+         */
+        public File getPeerPersistenceFile() {
+            return peerPersistenceFile;
+        }
+
+        /**
+         * @return a boolean indicating whether or not compression will be used
+         * to transfer data to and from the remote instance
+         */
+        public boolean isUseCompression() {
+            return useCompression;
+        }
+
+        /**
+         * @return the name of the port that the client is to communicate with
+         */
+        public String getPortName() {
+            return portName;
+        }
+
+        /**
+         * @return the identifier of the port that the client is to communicate
+         * with
+         */
+        public String getPortIdentifier() {
+            return portIdentifier;
+        }
+    }
+
+
+    public abstract class SerializableSiteToSiteClientConfig implements 
SiteToSiteClientConfig, Serializable {
+        private static final long serialVersionUID = 1L;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
new file mode 100644
index 0000000..50a0d3c
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface SiteToSiteClientConfig extends Serializable {
+
+    /**
+     * @return the configured URL for the remote NiFi instance
+     */
+    String getUrl();
+
+    /**
+     * @param timeUnit unit over which to report the timeout
+     * @return the communications timeout in given unit
+     */
+    long getTimeout(final TimeUnit timeUnit);
+
+    /**
+     * @param timeUnit the unit for which to report the time
+     * @return the amount of time that a connection can remain idle before it 
is
+     * "expired" and shut down
+     */
+    long getIdleConnectionExpiration(TimeUnit timeUnit);
+
+    /**
+     * @param timeUnit unit over which to report the time
+     * @return the amount of time that a particular node will be ignored after 
a
+     * communications error with that node occurs
+     */
+    long getPenalizationPeriod(TimeUnit timeUnit);
+
+    /**
+     * @return the SSL Context that is configured for this builder
+     */
+    SSLContext getSslContext();
+
+    /**
+     * @return the filename to use for the keystore, or <code>null</code> if 
none is configured
+     */
+    String getKeystoreFilename();
+
+    /**
+     * @return the password to use for the keystore, or <code>null</code> if 
none is configured
+     */
+    String getKeystorePassword();
+
+    /**
+     * @return the Type of the keystore, or <code>null</code> if none is 
configured
+     */
+    KeystoreType getKeystoreType();
+
+    /**
+     * @return the filename to use for the truststore, or <code>null</code> if 
none is configured
+     */
+    String getTruststoreFilename();
+
+    /**
+     * @return the password to use for the truststore, or <code>null</code> if 
none is configured
+     */
+    String getTruststorePassword();
+
+    /**
+     * @return the type of the truststore, or <code>null</code> if none is 
configured
+     */
+    KeystoreType getTruststoreType();
+
+    /**
+     * @return the file that is to be used for persisting the nodes of a remote
+     *         cluster, if any
+     */
+    File getPeerPersistenceFile();
+
+    /**
+     * @return a boolean indicating whether or not compression will be used to
+     * transfer data to and from the remote instance
+     */
+    boolean isUseCompression();
+
+    /**
+     * @return the name of the port that the client is to communicate with
+     */
+    String getPortName();
+
+    /**
+     * @return the identifier of the port that the client is to communicate 
with
+     */
+    String getPortIdentifier();
+
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a
+     * Transaction is. However, the client has the ability to request a
+     * particular batch size/duration.
+     *
+     * @param timeUnit unit of time over which to report the duration
+     * @return the maximum amount of time that we will request a NiFi instance
+     * to send data to us in a Transaction
+     */
+    long getPreferredBatchDuration(TimeUnit timeUnit);
+
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a
+     * Transaction is. However, the client has the ability to request a
+     * particular batch size/duration.
+     *
+     * @return returns the maximum number of bytes that we will request a NiFi
+     * instance to send data to us in a Transaction
+     */
+    long getPreferredBatchSize();
+
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a
+     * Transaction is. However, the client has the ability to request a
+     * particular batch size/duration.
+     *
+     * @return the maximum number of {@link DataPacket}s that we will request a
+     * NiFi instance to send data to us in a Transaction
+     */
+    int getPreferredBatchCount();
+
+    /**
+     * @return the EventReporter that is to be used by clients to report events
+     */
+    EventReporter getEventReporter();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
new file mode 100644
index 0000000..1a16b02
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+
+public class EndpointConnection {
+
+    private final Peer peer;
+    private final SocketClientProtocol socketClientProtocol;
+    private final FlowFileCodec codec;
+    private volatile long lastUsed;
+
+    public EndpointConnection(final Peer peer, final SocketClientProtocol 
socketClientProtocol, final FlowFileCodec codec) {
+        this.peer = peer;
+        this.socketClientProtocol = socketClientProtocol;
+        this.codec = codec;
+    }
+
+    public FlowFileCodec getCodec() {
+        return codec;
+    }
+
+    public SocketClientProtocol getSocketClientProtocol() {
+        return socketClientProtocol;
+    }
+
+    public Peer getPeer() {
+        return peer;
+    }
+
+    public void setLastTimeUsed() {
+        lastUsed = System.currentTimeMillis();
+    }
+
+    public long getLastTimeUsed() {
+        return lastUsed;
+    }
+}

Reply via email to