This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e659e3b  NIFI-5952 Refactor RAW S2S from nio to socket
e659e3b is described below

commit e659e3b606cc3e41816081046f07d9a8d33c88f6
Author: Koji Kawamura <[email protected]>
AuthorDate: Tue Dec 25 11:35:52 2018 +0900

    NIFI-5952 Refactor RAW S2S from nio to socket
---
 .../client/socket/EndpointConnectionPool.java      |  30 +-
 ...ssion.java => SocketCommunicationsSession.java} |  33 +-
 .../{SocketChannelInput.java => SocketInput.java}  |  34 +-
 ...{SocketChannelOutput.java => SocketOutput.java} |  30 +-
 .../ssl/SSLSocketChannelCommunicationsSession.java | 114 ------
 .../io/socket/ssl/SSLSocketChannelInput.java       |  55 ---
 .../io/socket/ssl/SSLSocketChannelOutput.java      |  44 ---
 .../protocol/socket/SocketClientTransaction.java   |   2 +-
 .../socket/TestSocketClientTransaction.java        |  53 +--
 .../nifi/remote/SocketRemoteSiteListener.java      | 386 +++++++++++----------
 .../nifi/remote/TestStandardRemoteGroupPort.java   | 100 +++---
 .../socket/TestSocketFlowFileServerProtocol.java   |  12 +-
 12 files changed, 362 insertions(+), 531 deletions(-)

diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index e9f2536..17d66da 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -34,11 +34,10 @@ import 
org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.exception.UnreachableClusterException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.security.util.CertificateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +48,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
+import java.net.Socket;
 import java.net.URI;
-import java.nio.channels.SocketChannel;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -450,27 +448,23 @@ public class EndpointConnectionPool implements 
PeerStatusProvider {
                             + " because it requires Secure Site-to-Site 
communications, but this instance is not configured for secure communications");
                 }
 
-                final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, localAddress, true);
-                socketChannel.connect();
-
-                commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel);
+                final Socket socket = 
sslContext.getSocketFactory().createSocket(hostname, port);
+                socket.setSoTimeout(commsTimeout);
+                commsSession = new SocketCommunicationsSession(socket);
 
                 try {
-                    commsSession.setUserDn(socketChannel.getDn());
+                    final String dn = 
CertificateUtils.extractPeerDNFromSSLSocket(socket);
+                    commsSession.setUserDn(dn);
                 } catch (final CertificateException ex) {
                     throw new IOException(ex);
                 }
             } else {
-                final SocketChannel socketChannel = SocketChannel.open();
-                if (localAddress != null) {
-                    final SocketAddress localSocketAddress = new 
InetSocketAddress(localAddress, 0);
-                    socketChannel.socket().bind(localSocketAddress);
-                }
 
-                socketChannel.socket().connect(new InetSocketAddress(hostname, 
port), commsTimeout);
-                socketChannel.socket().setSoTimeout(commsTimeout);
+                final Socket socket = new Socket();
+                socket.connect(new InetSocketAddress(hostname, port), 
commsTimeout);
+                socket.setSoTimeout(commsTimeout);
 
-                commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
+                commsSession = new SocketCommunicationsSession(socket);
             }
 
             
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
similarity index 76%
rename from 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
rename to 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
index e19cd3d..7b62f4b 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketCommunicationsSession.java
@@ -16,38 +16,39 @@
  */
 package org.apache.nifi.remote.io.socket;
 
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-
 import org.apache.nifi.remote.AbstractCommunicationsSession;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+import java.io.IOException;
+import java.net.Socket;
 
-public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
+public class SocketCommunicationsSession extends AbstractCommunicationsSession 
{
 
-    private final SocketChannel channel;
-    private final SocketChannelInput request;
-    private final SocketChannelOutput response;
+    private final Socket socket;
+    private final SocketInput request;
+    private final SocketOutput response;
     private int timeout = 30000;
 
-    public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel) throws IOException {
+    public SocketCommunicationsSession(final Socket socket) throws IOException 
{
         super();
-        request = new SocketChannelInput(socketChannel);
-        response = new SocketChannelOutput(socketChannel);
-        channel = socketChannel;
-        socketChannel.configureBlocking(false);
+        this.socket = socket;
+        request = new SocketInput(socket);
+        response = new SocketOutput(socket);
     }
 
     @Override
     public boolean isClosed() {
-        return !channel.isConnected();
+        return socket.isClosed();
     }
 
     @Override
-    public SocketChannelInput getInput() {
+    public CommunicationsInput getInput() {
         return request;
     }
 
     @Override
-    public SocketChannelOutput getOutput() {
+    public CommunicationsOutput getOutput() {
         return response;
     }
 
@@ -74,7 +75,7 @@ public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSe
         }
 
         try {
-            channel.close();
+            socket.close();
         } catch (final IOException ioe) {
             if (suppressed != null) {
                 ioe.addSuppressed(suppressed);
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
similarity index 70%
rename from 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
rename to 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
index 5cf2a62..1048073 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketInput.java
@@ -19,21 +19,28 @@ package org.apache.nifi.remote.io.socket;
 import org.apache.nifi.remote.io.InterruptableInputStream;
 import org.apache.nifi.remote.protocol.CommunicationsInput;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.channels.SocketChannel;
+import java.net.Socket;
+import java.net.SocketException;
 
-public class SocketChannelInput implements CommunicationsInput {
+public class SocketInput implements CommunicationsInput {
 
-    private final SocketChannelInputStream socketIn;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SocketInput.class);
+
+    private final Socket socket;
+    private final InputStream socketIn;
     private final ByteCountingInputStream countingIn;
     private final InputStream bufferedIn;
     private final InterruptableInputStream interruptableIn;
 
-    public SocketChannelInput(final SocketChannel socketChannel) throws 
IOException {
-        this.socketIn = new SocketChannelInputStream(socketChannel);
+    public SocketInput(final Socket socket) throws IOException {
+        this.socket = socket;
+        socketIn = socket.getInputStream();
         countingIn = new ByteCountingInputStream(socketIn);
         bufferedIn = new BufferedInputStream(countingIn);
         interruptableIn = new InterruptableInputStream(bufferedIn);
@@ -45,7 +52,11 @@ public class SocketChannelInput implements 
CommunicationsInput {
     }
 
     public void setTimeout(final int millis) {
-        socketIn.setTimeout(millis);
+        try {
+            socket.setSoTimeout(millis);
+        } catch (SocketException e) {
+            LOG.warn("Failed to set socket timeout.", e);
+        }
     }
 
     public boolean isDataAvailable() {
@@ -63,11 +74,18 @@ public class SocketChannelInput implements 
CommunicationsInput {
 
     public void interrupt() {
         interruptableIn.interrupt();
-        socketIn.interrupt();
     }
 
     @Override
     public void consume() throws IOException {
-        socketIn.consume();
+        if (interruptableIn == null || !isDataAvailable()) {
+            return;
+        }
+
+        final byte[] b = new byte[4096];
+        int bytesRead;
+        do {
+            bytesRead = interruptableIn.read(b);
+        } while (bytesRead > 0);
     }
 }
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
similarity index 73%
rename from 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
rename to 
nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
index 859b88c..6676239 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketOutput.java
@@ -16,24 +16,30 @@
  */
 package org.apache.nifi.remote.io.socket;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.SocketChannel;
 import org.apache.nifi.remote.io.InterruptableOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsOutput;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+public class SocketOutput implements CommunicationsOutput {
 
-public class SocketChannelOutput implements CommunicationsOutput {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SocketOutput.class);
 
-    private final SocketChannelOutputStream socketOutStream;
+    private final Socket socket;
     private final ByteCountingOutputStream countingOut;
     private final OutputStream bufferedOut;
     private final InterruptableOutputStream interruptableOut;
 
-    public SocketChannelOutput(final SocketChannel socketChannel) throws 
IOException {
-        socketOutStream = new SocketChannelOutputStream(socketChannel);
-        countingOut = new ByteCountingOutputStream(socketOutStream);
+    public SocketOutput(final Socket socket) throws IOException {
+        this.socket = socket;
+        countingOut = new ByteCountingOutputStream(socket.getOutputStream());
         bufferedOut = new BufferedOutputStream(countingOut);
         interruptableOut = new InterruptableOutputStream(bufferedOut);
     }
@@ -44,7 +50,11 @@ public class SocketChannelOutput implements 
CommunicationsOutput {
     }
 
     public void setTimeout(final int timeout) {
-        socketOutStream.setTimeout(timeout);
+        try {
+            socket.setSoTimeout(timeout);
+        } catch (SocketException e) {
+            LOG.warn("Failed to set socket timeout.", e);
+        }
     }
 
     @Override
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index 73f5a90..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
-
-    private final SSLSocketChannel channel;
-    private final SSLSocketChannelInput request;
-    private final SSLSocketChannelOutput response;
-
-    public SSLSocketChannelCommunicationsSession(final SSLSocketChannel 
channel) {
-        super();
-        request = new SSLSocketChannelInput(channel);
-        response = new SSLSocketChannelOutput(channel);
-        this.channel = channel;
-    }
-
-    @Override
-    public SSLSocketChannelInput getInput() {
-        return request;
-    }
-
-    @Override
-    public SSLSocketChannelOutput getOutput() {
-        return response;
-    }
-
-    @Override
-    public void setTimeout(final int millis) throws IOException {
-        channel.setTimeout(millis);
-    }
-
-    @Override
-    public int getTimeout() throws IOException {
-        return channel.getTimeout();
-    }
-
-    @Override
-    public void close() throws IOException {
-        IOException suppressed = null;
-
-        try {
-            request.consume();
-        } catch (final IOException ioe) {
-            suppressed = ioe;
-        }
-
-        try {
-            channel.close();
-        } catch (final IOException ioe) {
-            if (suppressed != null) {
-                ioe.addSuppressed(suppressed);
-            }
-
-            throw ioe;
-        }
-
-        if (suppressed != null) {
-            throw suppressed;
-        }
-    }
-
-    @Override
-    public boolean isClosed() {
-        return channel.isClosed();
-    }
-
-    @Override
-    public boolean isDataAvailable() {
-        try {
-            return request.isDataAvailable();
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return response.getBytesWritten();
-    }
-
-    @Override
-    public long getBytesRead() {
-        return request.getBytesRead();
-    }
-
-    @Override
-    public void interrupt() {
-        channel.interrupt();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString() + "[SSLSocketChannel=" + channel + "]";
-    }
-}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 842b7f2..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
-
-    private final SSLSocketChannelInputStream in;
-    private final ByteCountingInputStream countingIn;
-    private final InputStream bufferedIn;
-
-    public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
-        in = new SSLSocketChannelInputStream(socketChannel);
-        countingIn = new ByteCountingInputStream(in);
-        this.bufferedIn = new BufferedInputStream(countingIn);
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return bufferedIn;
-    }
-
-    public boolean isDataAvailable() throws IOException {
-        return bufferedIn.available() > 0;
-    }
-
-    @Override
-    public long getBytesRead() {
-        return countingIn.getBytesRead();
-    }
-
-    @Override
-    public void consume() throws IOException {
-        in.consume();
-    }
-}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index d413d0b..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
-
-    private final OutputStream out;
-    private final ByteCountingOutputStream countingOut;
-
-    public SSLSocketChannelOutput(final SSLSocketChannel channel) {
-        countingOut = new ByteCountingOutputStream(new 
SSLSocketChannelOutputStream(channel));
-        out = new BufferedOutputStream(countingOut);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        return out;
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return countingOut.getBytesWritten();
-    }
-}
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 5b1eb1c..85d6c1a 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -63,7 +63,7 @@ public class SocketClientTransaction extends 
AbstractTransaction {
                         this.dataAvailable = true;
                         break;
                     case NO_MORE_DATA:
-                        logger.debug("{} No data available from {}", peer);
+                        logger.debug("{} No data available from {}", this, 
peer);
                         this.dataAvailable = false;
                         return;
                     default:
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
index ec86dff..048d612 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketClientTransaction.java
@@ -16,25 +16,6 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
-import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
-import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
@@ -43,18 +24,38 @@ import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.NoContentException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelInput;
-import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketInput;
+import org.apache.nifi.remote.io.socket.SocketOutput;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.Response;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.junit.Test;
-import static org.junit.Assert.fail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.createDataPacket;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveOneFlowFile;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveTwoFlowFiles;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execReceiveWithInvalidChecksum;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendButDestinationFull;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendOneFlowFile;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendTwoFlowFiles;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendWithInvalidChecksum;
+import static 
org.apache.nifi.remote.protocol.SiteToSiteTestUtils.execSendZeroFlowFile;
+import static org.apache.nifi.remote.protocol.SiteToSiteTestUtils.readContents;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestSocketClientTransaction {
 
     private Logger logger = 
LoggerFactory.getLogger(TestSocketClientTransaction.class);
@@ -63,9 +64,9 @@ public class TestSocketClientTransaction {
     private SocketClientTransaction getClientTransaction(ByteArrayInputStream 
bis, ByteArrayOutputStream bos, TransferDirection direction) throws IOException 
{
         PeerDescription description = null;
         String peerUrl = "";
-        SocketChannelCommunicationsSession commsSession = 
mock(SocketChannelCommunicationsSession.class);
-        SocketChannelInput socketIn = mock(SocketChannelInput.class);
-        SocketChannelOutput socketOut = mock(SocketChannelOutput.class);
+        SocketCommunicationsSession commsSession = 
mock(SocketCommunicationsSession.class);
+        SocketInput socketIn = mock(SocketInput.class);
+        SocketOutput socketOut = mock(SocketOutput.class);
         when(commsSession.getInput()).thenReturn(socketIn);
         when(commsSession.getOutput()).thenReturn(socketOut);
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 6fa86ff..830d043 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -24,17 +24,17 @@ import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.NotAuthorizedException;
 import org.apache.nifi.remote.exception.RequestExpiredException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.security.util.CertificateUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -42,12 +42,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -90,9 +87,6 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
         final boolean secure = (sslContext != null);
         final List<Thread> threads = new ArrayList<>();
 
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(true);
-        serverSocketChannel.bind(new InetSocketAddress(socketPort));
         stopped.set(false);
 
         final Thread listenerThread = new Thread(new Runnable() {
@@ -100,220 +94,248 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
 
             @Override
             public void run() {
-                while (!stopped.get()) {
-                    LOG.trace("Accepting Connection...");
-                    Socket acceptedSocket = null;
-                    try {
-                        serverSocketChannel.configureBlocking(false);
-                        final ServerSocket serverSocket = 
serverSocketChannel.socket();
-                        serverSocket.setSoTimeout(2000);
-                        while (!stopped.get() && acceptedSocket == null) {
-                            try {
-                                acceptedSocket = serverSocket.accept();
-                            } catch (final SocketTimeoutException ste) {
-                                continue;
-                            }
+
+                try (final ServerSocket serverSocket = createServerSocket()) {
+                    serverSocket.setSoTimeout(2000);
+
+                    while (!stopped.get()) {
+
+                        final Socket acceptedSocket = 
acceptConnection(serverSocket);
+
+                        if (acceptedSocket == null) {
+                            continue;
                         }
-                    } catch (final IOException e) {
-                        LOG.error("RemoteSiteListener Unable to accept 
connection due to {}", e.toString());
-                        if (LOG.isDebugEnabled()) {
-                            LOG.error("", e);
+
+                        if (stopped.get()) {
+                            break;
                         }
-                        continue;
+
+                        final Thread thread = 
createWorkerThread(acceptedSocket);
+                        thread.setName("Site-to-Site Worker Thread-" + 
(threadCount++));
+                        LOG.debug("Handing connection to {}", thread);
+                        thread.start();
+                        threads.add(thread);
+                        threads.removeIf(t -> !t.isAlive());
+
                     }
-                    LOG.trace("Got connection");
 
-                    if (stopped.get()) {
-                        break;
+                } catch (final IOException e) {
+                    LOG.error("Unable to open server socket due to {}", 
e.toString());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.error("", e);
                     }
+                }
 
-                    final Socket socket = acceptedSocket;
-                    final SocketChannel socketChannel = socket.getChannel();
-                    final Thread thread = new Thread(new Runnable() {
-                        @Override
-                        public void run() {
-                            LOG.debug("{} Determining URL of connection", 
this);
-                            final InetAddress inetAddress = 
socket.getInetAddress();
-                            String clientHostName = inetAddress.getHostName();
-                            final int slashIndex = clientHostName.indexOf("/");
-                            if (slashIndex == 0) {
-                                clientHostName = clientHostName.substring(1);
-                            } else if (slashIndex > 0) {
-                                clientHostName = clientHostName.substring(0, 
slashIndex);
-                            }
 
-                            final int clientPort = socket.getPort();
-                            final String peerUri = "nifi://" + clientHostName 
+ ":" + clientPort;
-                            LOG.debug("{} Connection URL is {}", this, 
peerUri);
+                for(Thread thread : threads) {
+                    if(thread != null) {
+                        thread.interrupt();
+                    }
+                }
+            }
 
-                            final CommunicationsSession commsSession;
-                            final String dn;
-                            try {
-                                if (secure) {
-                                    final SSLSocketChannel sslSocketChannel = 
new SSLSocketChannel(sslContext, socketChannel, false);
-                                    LOG.trace("Channel is secure; 
connecting...");
-                                    sslSocketChannel.connect();
-                                    LOG.trace("Channel connected");
-
-                                    commsSession = new 
SSLSocketChannelCommunicationsSession(sslSocketChannel);
-                                    dn = sslSocketChannel.getDn();
-                                    commsSession.setUserDn(dn);
-                                } else {
-                                    LOG.trace("{} Channel is not secure", 
this);
-                                    commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
-                                    dn = null;
-                                }
-                            } catch (final Exception e) {
-                                LOG.error("RemoteSiteListener Unable to accept 
connection from {} due to {}", socket, e.toString());
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.error("", e);
-                                }
-                                try {
-                                    socketChannel.close();
-                                } catch (IOException swallow) {
-                                }
-                                return;
-                            }
+            private Thread createWorkerThread(Socket socket) {
+                return new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        LOG.debug("{} Determining URL of connection", this);
+                        final InetAddress inetAddress = 
socket.getInetAddress();
+                        String clientHostName = inetAddress.getHostName();
+                        final int slashIndex = clientHostName.indexOf("/");
+                        if (slashIndex == 0) {
+                            clientHostName = clientHostName.substring(1);
+                        } else if (slashIndex > 0) {
+                            clientHostName = clientHostName.substring(0, 
slashIndex);
+                        }
 
-                            LOG.info("Received connection from {}, User DN: 
{}", socket.getInetAddress(), dn);
+                        final int clientPort = socket.getPort();
+                        final String peerUri = "nifi://" + clientHostName + 
":" + clientPort;
+                        LOG.debug("{} Connection URL is {}", this, peerUri);
 
-                            final InputStream socketIn;
-                            final OutputStream socketOut;
+                        final CommunicationsSession commsSession;
+                        final String dn;
+                        try {
+                            if (secure) {
+                                LOG.trace("{} Connection is secure", this);
+                                dn = 
CertificateUtils.extractPeerDNFromSSLSocket(socket);
 
-                            try {
-                                socketIn = 
commsSession.getInput().getInputStream();
-                                socketOut = 
commsSession.getOutput().getOutputStream();
-                            } catch (final IOException e) {
-                                LOG.error("Connection dropped from {} before 
any data was transmitted", peerUri);
-                                try {
-                                    commsSession.close();
-                                } catch (final IOException ioe) {
-                                }
+                                commsSession = new 
SocketCommunicationsSession(socket);
+                                commsSession.setUserDn(dn);
 
-                                return;
+                            } else {
+                                LOG.trace("{} Connection is not secure", this);
+                                commsSession = new 
SocketCommunicationsSession(socket);
+                                dn = null;
+                            }
+                        } catch (final Exception e) {
+                            LOG.error("RemoteSiteListener Unable to accept 
connection from {} due to {}", socket, e.toString());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.error("", e);
                             }
+                            return;
+                        }
 
-                            final DataInputStream dis = new 
DataInputStream(socketIn);
-                            final DataOutputStream dos = new 
DataOutputStream(socketOut);
+                        LOG.info("Received connection from {}, User DN: {}", 
socket.getInetAddress(), dn);
 
-                            ServerProtocol protocol = null;
-                            Peer peer = null;
+                        final InputStream socketIn;
+                        final OutputStream socketOut;
+
+                        try {
+                            socketIn = 
commsSession.getInput().getInputStream();
+                            socketOut = 
commsSession.getOutput().getOutputStream();
+                        } catch (final IOException e) {
+                            LOG.error("Connection dropped from {} before any 
data was transmitted", peerUri);
                             try {
-                                // ensure that we are communicating with 
another NiFi
-                                LOG.debug("Verifying magic bytes...");
-                                verifyMagicBytes(dis, peerUri);
-
-                                LOG.debug("Receiving Server Protocol 
Negotiation");
-                                protocol = 
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
-                                protocol.setRootProcessGroup(rootGroup.get());
-                                protocol.setNodeInformant(nodeInformant);
-                                if (protocol instanceof 
PeerDescriptionModifiable) {
-                                    
((PeerDescriptionModifiable)protocol).setPeerDescriptionModifier(peerDescriptionModifier);
-                                }
+                                commsSession.close();
+                            } catch (final IOException ioe) {
+                            }
 
-                                final PeerDescription description = new 
PeerDescription(clientHostName, clientPort, sslContext != null);
-                                peer = new Peer(description, commsSession, 
peerUri, "nifi://localhost:" + getPort());
-                                LOG.debug("Handshaking....");
-                                protocol.handshake(peer);
-
-                                if (!protocol.isHandshakeSuccessful()) {
-                                    LOG.error("Handshake failed with {}; 
closing connection", peer);
-                                    try {
-                                        peer.close();
-                                    } catch (final IOException e) {
-                                        LOG.warn("Failed to close {} due to 
{}", peer, e);
-                                    }
+                            return;
+                        }
+
+                        final DataInputStream dis = new 
DataInputStream(socketIn);
+                        final DataOutputStream dos = new 
DataOutputStream(socketOut);
+
+                        ServerProtocol protocol = null;
+                        Peer peer = null;
+                        try {
+                            // ensure that we are communicating with another 
NiFi
+                            LOG.debug("Verifying magic bytes...");
+                            verifyMagicBytes(dis, peerUri);
+
+                            LOG.debug("Receiving Server Protocol Negotiation");
+                            protocol = 
RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
+                            protocol.setRootProcessGroup(rootGroup.get());
+                            protocol.setNodeInformant(nodeInformant);
+                            if (protocol instanceof PeerDescriptionModifiable) 
{
+                                ((PeerDescriptionModifiable) 
protocol).setPeerDescriptionModifier(peerDescriptionModifier);
+                            }
 
-                                    // no need to shutdown protocol because we 
failed to perform handshake
-                                    return;
+                            final PeerDescription description = new 
PeerDescription(clientHostName, clientPort, sslContext != null);
+                            peer = new Peer(description, commsSession, 
peerUri, "nifi://localhost:" + getPort());
+                            LOG.debug("Handshaking....");
+                            protocol.handshake(peer);
+
+                            if (!protocol.isHandshakeSuccessful()) {
+                                LOG.error("Handshake failed with {}; closing 
connection", peer);
+                                try {
+                                    peer.close();
+                                } catch (final IOException e) {
+                                    LOG.warn("Failed to close {} due to {}", 
peer, e);
                                 }
 
-                                commsSession.setTimeout((int) 
protocol.getRequestExpiration());
+                                // no need to shutdown protocol because we 
failed to perform handshake
+                                return;
+                            }
 
-                                LOG.info("Successfully negotiated 
ServerProtocol {} Version {} with {}", protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer);
+                            commsSession.setTimeout((int) 
protocol.getRequestExpiration());
 
-                                try {
-                                    while (!protocol.isShutdown()) {
-                                        LOG.trace("Getting Protocol Request 
Type...");
-
-                                        int timeoutCount = 0;
-                                        RequestType requestType = null;
-
-                                        while (requestType == null) {
-                                            try {
-                                                requestType = 
protocol.getRequestType(peer);
-                                            } catch (final 
SocketTimeoutException e) {
-                                                // Give the timeout a bit 
longer (twice as long) to receive the Request Type,
-                                                // in order to attempt to 
receive more data without shutting down the socket if we don't
-                                                // have to.
-                                                LOG.debug("{} Timed out 
waiting to receive RequestType using {} with {}", this, protocol, peer);
-                                                timeoutCount++;
-                                                requestType = null;
-
-                                                if (timeoutCount >= 2) {
-                                                    throw e;
-                                                }
+                            LOG.info("Successfully negotiated ServerProtocol 
{} Version {} with {}",
+                                protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer);
+
+                            try {
+                                while (!protocol.isShutdown()) {
+                                    LOG.trace("Getting Protocol Request 
Type...");
+
+                                    int timeoutCount = 0;
+                                    RequestType requestType = null;
+
+                                    while (requestType == null) {
+                                        try {
+                                            requestType = 
protocol.getRequestType(peer);
+                                        } catch (final SocketTimeoutException 
e) {
+                                            // Give the timeout a bit longer 
(twice as long) to receive the Request Type,
+                                            // in order to attempt to receive 
more data without shutting down the socket if we don't
+                                            // have to.
+                                            LOG.debug("{} Timed out waiting to 
receive RequestType using {} with {}", this, protocol, peer);
+                                            timeoutCount++;
+                                            requestType = null;
+
+                                            if (timeoutCount >= 2) {
+                                                throw e;
                                             }
                                         }
-
-                                        handleRequest(protocol, peer, 
requestType);
-                                    }
-                                    LOG.debug("Finished communicating with {} 
({})", peer, protocol);
-                                } catch (final Exception e) {
-                                    LOG.error("Unable to communicate with 
remote instance {} ({}) due to {}; closing connection", peer, protocol, 
e.toString());
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.error("", e);
                                     }
+
+                                    handleRequest(protocol, peer, requestType);
                                 }
-                            } catch (final IOException e) {
-                                LOG.error("Unable to communicate with remote 
instance {} due to {}; closing connection", peer, e.toString());
+                                LOG.debug("Finished communicating with {} 
({})", peer, protocol);
+                            } catch (final Exception e) {
+                                LOG.error("Unable to communicate with remote 
instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
                                 if (LOG.isDebugEnabled()) {
                                     LOG.error("", e);
                                 }
-                            } catch (final Throwable t) {
-                                LOG.error("Handshake failed when communicating 
with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.error("", t);
-                                }
-                            } finally {
-                                LOG.trace("Cleaning up");
-                                try {
-                                    if (protocol != null && peer != null) {
-                                        protocol.shutdown(peer);
-                                    }
-                                } catch (final Exception protocolException) {
-                                    LOG.warn("Failed to shutdown protocol due 
to {}", protocolException.toString());
+                            }
+                        } catch (final IOException e) {
+                            LOG.error("Unable to communicate with remote 
instance {} due to {}; closing connection", peer, e.toString());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.error("", e);
+                            }
+                        } catch (final Throwable t) {
+                            LOG.error("Handshake failed when communicating 
with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
+                            if (LOG.isDebugEnabled()) {
+                                LOG.error("", t);
+                            }
+                        } finally {
+                            LOG.trace("Cleaning up");
+                            try {
+                                if (protocol != null && peer != null) {
+                                    protocol.shutdown(peer);
                                 }
+                            } catch (final Exception protocolException) {
+                                LOG.warn("Failed to shutdown protocol due to 
{}", protocolException.toString());
+                            }
 
-                                try {
-                                    if (peer != null) {
-                                        peer.close();
-                                    }
-                                } catch (final Exception peerException) {
-                                    LOG.warn("Failed to close peer due to {}; 
some resources may not be appropriately cleaned up", peerException.toString());
+                            try {
+                                if (peer != null) {
+                                    peer.close();
                                 }
-                                LOG.trace("Finished cleaning up");
+                            } catch (final Exception peerException) {
+                                LOG.warn("Failed to close peer due to {}; some 
resources may not be appropriately cleaned up", peerException.toString());
                             }
+                            LOG.trace("Finished cleaning up");
                         }
-                    });
-                    thread.setName("Site-to-Site Worker Thread-" + 
(threadCount++));
-                    LOG.debug("Handing connection to {}", thread);
-                    thread.start();
-                    threads.add(thread);
-                    threads.removeIf(t -> !t.isAlive());
-                }
-
-                for(Thread thread : threads) {
-                    if(thread != null) {
-                        thread.interrupt();
                     }
-                }
+                });
             }
         });
+
         listenerThread.setName("Site-to-Site Listener");
         listenerThread.start();
     }
 
+    private ServerSocket createServerSocket() throws IOException {
+        if (sslContext != null) {
+            final ServerSocket serverSocket = 
sslContext.getServerSocketFactory().createServerSocket(socketPort);
+            ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
+            return serverSocket;
+        } else {
+            return new ServerSocket(socketPort);
+        }
+    }
+
+    private Socket acceptConnection(ServerSocket serverSocket) {
+        LOG.trace("Accepting Connection...");
+        Socket acceptedSocket = null;
+        try {
+            while (!stopped.get() && acceptedSocket == null) {
+                try {
+                    acceptedSocket = serverSocket.accept();
+                } catch (final SocketTimeoutException ste) {
+                    LOG.trace("SocketTimeoutException occurred. {}", 
ste.getMessage());
+                }
+            }
+        } catch (final IOException e) {
+            LOG.error("RemoteSiteListener Unable to accept connection due to 
{}", e.toString());
+            if (LOG.isDebugEnabled()) {
+                LOG.error("", e);
+            }
+            return acceptedSocket;
+        }
+        LOG.trace("Got connection");
+        return acceptedSocket;
+    }
+
     private void handleRequest(final ServerProtocol protocol, final Peer peer, 
final RequestType requestType)
             throws IOException, NotAuthorizedException, BadRequestException, 
RequestExpiredException {
         LOG.debug("Request type from {} is {}", protocol, requestType);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 0831f51..d7b423d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -31,7 +31,7 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -44,7 +44,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +60,7 @@ import org.apache.nifi.util.NiFiProperties;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -154,28 +154,27 @@ public class TestStandardRemoteGroupPort {
 
         final String peerUrl = "nifi://node1.example.com:9090";
         final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 9090, true);
-        try (final SocketChannel socketChannel = SocketChannel.open()) {
-            final CommunicationsSession commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
-            commsSession.setUserDn("nifi.node1.example.com");
-            final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+        final CommunicationsSession commsSession =  
mock(SocketCommunicationsSession.class);
+        when(commsSession.createTransitUri(anyString(), 
anyString())).thenReturn("nifi://node1.example.com:9090/flowfile-uuid");
+        when(commsSession.getUserDn()).thenReturn("nifi.node1.example.com");
+        final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
 
-            doReturn(peer).when(transaction).getCommunicant();
+        doReturn(peer).when(transaction).getCommunicant();
 
-            final MockFlowFile flowFile = 
processSession.createFlowFile("0123456789".getBytes());
-            sessionState.getFlowFileQueue().offer(flowFile);
+        final MockFlowFile flowFile = 
processSession.createFlowFile("0123456789".getBytes());
+        sessionState.getFlowFileQueue().offer(flowFile);
 
-            port.onTrigger(processContext, processSession);
+        port.onTrigger(processContext, processSession);
 
-            // Assert provenance.
-            final List<ProvenanceEventRecord> provenanceEvents = 
sessionState.getProvenanceEvents();
-            assertEquals(1, provenanceEvents.size());
-            final ProvenanceEventRecord provenanceEvent = 
provenanceEvents.get(0);
-            assertEquals(ProvenanceEventType.SEND, 
provenanceEvent.getEventType());
-            assertEquals(peerUrl + "/" + 
flowFile.getAttribute(CoreAttributes.UUID.key()), 
provenanceEvent.getTransitUri());
-            assertEquals("Remote DN=nifi.node1.example.com", 
provenanceEvent.getDetails());
-            assertEquals("remote-group-port-id", 
provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = 
sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+        assertEquals("nifi://node1.example.com:9090/flowfile-uuid", 
provenanceEvent.getTransitUri());
+        assertEquals("Remote DN=nifi.node1.example.com", 
provenanceEvent.getDetails());
+        assertEquals("remote-group-port-id", 
provenanceEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key()));
 
-        }
     }
 
     @Test
@@ -186,43 +185,42 @@ public class TestStandardRemoteGroupPort {
 
         final String peerUrl = "nifi://node1.example.com:9090";
         final PeerDescription peerDescription = new 
PeerDescription("node1.example.com", 9090, true);
-        try (final SocketChannel socketChannel = SocketChannel.open()) {
-            final CommunicationsSession commsSession = new 
SocketChannelCommunicationsSession(socketChannel);
-            commsSession.setUserDn("nifi.node1.example.com");
-            final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
+        final CommunicationsSession commsSession =  
mock(SocketCommunicationsSession.class);
+        when(commsSession.createTransitUri(anyString(), 
anyString())).thenReturn("nifi://node1.example.com:9090/flowfile-uuid");
+        when(commsSession.getUserDn()).thenReturn("nifi.node1.example.com");
+        final Peer peer = new Peer(peerDescription, commsSession, peerUrl, 
REMOTE_CLUSTER_URL);
 
-            doReturn(peer).when(transaction).getCommunicant();
+        doReturn(peer).when(transaction).getCommunicant();
+
+        final String sourceFlowFileUuid = "flowfile-uuid";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
+        final byte[] dataPacketContents = "DataPacket Contents".getBytes();
+        final ByteArrayInputStream dataPacketInputStream = new 
ByteArrayInputStream(dataPacketContents);
+        final DataPacket dataPacket = new StandardDataPacket(attributes,
+                dataPacketInputStream, dataPacketContents.length);
 
-            final String sourceFlowFileUuid = "flowfile-uuid";
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
-            final byte[] dataPacketContents = "DataPacket Contents".getBytes();
-            final ByteArrayInputStream dataPacketInputStream = new 
ByteArrayInputStream(dataPacketContents);
-            final DataPacket dataPacket = new StandardDataPacket(attributes,
-                    dataPacketInputStream, dataPacketContents.length);
+        // Return null when it gets called second time.
+        doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
 
-            // Return null when it gets called second time.
-            
doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
+        port.onTrigger(processContext, processSession);
 
-            port.onTrigger(processContext, processSession);
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = 
sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent.getEventType());
+        assertEquals("nifi://node1.example.com:9090/flowfile-uuid", 
provenanceEvent.getTransitUri());
+        assertEquals("Remote DN=nifi.node1.example.com", 
provenanceEvent.getDetails());
 
-            // Assert provenance.
-            final List<ProvenanceEventRecord> provenanceEvents = 
sessionState.getProvenanceEvents();
-            assertEquals(1, provenanceEvents.size());
-            final ProvenanceEventRecord provenanceEvent = 
provenanceEvents.get(0);
-            assertEquals(ProvenanceEventType.RECEIVE, 
provenanceEvent.getEventType());
-            assertEquals(peerUrl + "/" + sourceFlowFileUuid, 
provenanceEvent.getTransitUri());
-            assertEquals("Remote DN=nifi.node1.example.com", 
provenanceEvent.getDetails());
-
-            // Assert received flow files.
-            
processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
-            final List<MockFlowFile> flowFiles = 
processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
-            assertEquals(1, flowFiles.size());
-            final MockFlowFile flowFile = flowFiles.get(0);
-            
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), 
peer.getHost());
-            
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), 
peer.getHost() + ":" + peer.getPort());
-            
flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), 
"remote-group-port-id");
-        }
+        // Assert received flow files.
+        processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+        final List<MockFlowFile> flowFiles = 
processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.get(0);
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), 
peer.getHost());
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), 
peer.getHost() + ":" + peer.getPort());
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_PORT_ID.key(), 
"remote-group-port-id");
 
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
index 77c39bc..dbc9c91 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
@@ -21,9 +21,9 @@ import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.SocketChannelInput;
-import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketInput;
+import org.apache.nifi.remote.io.socket.SocketOutput;
 import org.apache.nifi.remote.protocol.HandshakeProperties;
 import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.protocol.Response;
@@ -75,9 +75,9 @@ public class TestSocketFlowFileServerProtocol {
 
         final InputStream inputStream = new ByteArrayInputStream(inputBytes);
 
-        final SocketChannelCommunicationsSession commsSession = 
mock(SocketChannelCommunicationsSession.class);
-        final SocketChannelInput channelInput = mock(SocketChannelInput.class);
-        final SocketChannelOutput channelOutput = 
mock(SocketChannelOutput.class);
+        final SocketCommunicationsSession commsSession = 
mock(SocketCommunicationsSession.class);
+        final SocketInput channelInput = mock(SocketInput.class);
+        final SocketOutput channelOutput = mock(SocketOutput.class);
         when(commsSession.getInput()).thenReturn(channelInput);
         when(commsSession.getOutput()).thenReturn(channelOutput);
 

Reply via email to