Repository: nifi
Updated Branches:
  refs/heads/master 0f2ac39f6 -> 8ce2a1b3a


NIFI-3166 This closes #1324. Fix SocketRemoteSiteListener NPE.

- Refactored ServerProtocol.sendPeerList method signature to clarify the
  meaning of arguments, and avoid null pointer exception when converting null 
Integer to int.
- Refactored SocketRemoteSiteListener handleRequest method to make it
  more unit test friendly.
- Added more unit tests.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0ddb90cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0ddb90cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0ddb90cd

Branch: refs/heads/master
Commit: 0ddb90cd2deff32a20df6a7e60009758ce21712d
Parents: 0f2ac39
Author: Koji Kawamura <[email protected]>
Authored: Tue Dec 13 14:33:16 2016 +0900
Committer: joewitt <[email protected]>
Committed: Wed Apr 19 22:18:43 2017 -0700

----------------------------------------------------------------------
 .../nifi/remote/protocol/ServerProtocol.java    |  11 +-
 .../nifi/remote/SocketRemoteSiteListener.java   |  70 ++++---
 .../StandardHttpFlowFileServerProtocol.java     |   4 +-
 .../socket/SocketFlowFileServerProtocol.java    |  12 +-
 .../remote/TestSocketRemoteSiteListener.java    |  79 ++++++++
 .../TestSocketFlowFileServerProtocol.java       | 185 +++++++++++++++++++
 6 files changed, 313 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
index 69ae396..638704f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -27,6 +27,7 @@ import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.VersionedRemoteResource;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
@@ -134,20 +135,14 @@ public interface ServerProtocol extends 
VersionedRemoteResource {
      *
      * @param peer peer
      * @param clusterNodeInfo the cluster information
-     * @param remoteInputHost the remote input host
-     * @param remoteInputPort the remote input port
-     * @param remoteInputHttpPort the remote input http port
-     * @param isSiteToSiteSecure whether site to site is secure
+     * @param self the node which received the request
      *
      * @throws java.io.IOException ioe
      */
     void sendPeerList(
             Peer peer,
             Optional<ClusterNodeInformation> clusterNodeInfo,
-            String remoteInputHost,
-            Integer remoteInputPort,
-            Integer remoteInputHttpPort,
-            boolean isSiteToSiteSecure) throws IOException;
+            NodeInformation self) throws IOException;
 
     void shutdown(Peer peer);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 5222bbc..a367e9e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -18,7 +18,11 @@ package org.apache.nifi.remote;
 
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
 import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
 import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
@@ -257,33 +261,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                             }
                                         }
 
-                                        LOG.debug("Request type from {} is 
{}", protocol, requestType);
-                                        switch (requestType) {
-                                            case NEGOTIATE_FLOWFILE_CODEC:
-                                                protocol.negotiateCodec(peer);
-                                                break;
-                                            case RECEIVE_FLOWFILES:
-                                                // peer wants to receive 
FlowFiles, so we will transfer FlowFiles.
-                                                
protocol.getPort().transferFlowFiles(peer, protocol);
-                                                break;
-                                            case SEND_FLOWFILES:
-                                                // Peer wants to send 
FlowFiles, so we will receive.
-                                                
protocol.getPort().receiveFlowFiles(peer, protocol);
-                                                break;
-                                            case REQUEST_PEER_LIST:
-                                                final 
Optional<ClusterNodeInformation> nodeInfo = (nodeInformant == null) ? 
Optional.empty() : Optional.of(nodeInformant.getNodeInformation());
-                                                protocol.sendPeerList(
-                                                        peer,
-                                                        nodeInfo,
-                                                        
nifiProperties.getRemoteInputHost(),
-                                                        
nifiProperties.getRemoteInputPort(),
-                                                        
nifiProperties.getRemoteInputHttpPort(),
-                                                        
nifiProperties.isSiteToSiteSecure());
-                                                break;
-                                            case SHUTDOWN:
-                                                protocol.shutdown(peer);
-                                                break;
-                                        }
+                                        handleRequest(protocol, peer, 
requestType);
                                     }
                                     LOG.debug("Finished communicating with {} 
({})", peer, protocol);
                                 } catch (final Exception e) {
@@ -333,6 +311,44 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
         listenerThread.start();
     }
 
+    private void handleRequest(final ServerProtocol protocol, final Peer peer, 
final RequestType requestType)
+            throws IOException, NotAuthorizedException, BadRequestException, 
RequestExpiredException {
+        LOG.debug("Request type from {} is {}", protocol, requestType);
+        switch (requestType) {
+            case NEGOTIATE_FLOWFILE_CODEC:
+                protocol.negotiateCodec(peer);
+                break;
+            case RECEIVE_FLOWFILES:
+                // peer wants to receive FlowFiles, so we will transfer 
FlowFiles.
+                protocol.getPort().transferFlowFiles(peer, protocol);
+                break;
+            case SEND_FLOWFILES:
+                // Peer wants to send FlowFiles, so we will receive.
+                protocol.getPort().receiveFlowFiles(peer, protocol);
+                break;
+            case REQUEST_PEER_LIST:
+                final Optional<ClusterNodeInformation> nodeInfo = 
(nodeInformant == null) ? Optional.empty() : 
Optional.of(nodeInformant.getNodeInformation());
+
+                String remoteInputHostVal = 
nifiProperties.getRemoteInputHost();
+                if (remoteInputHostVal == null) {
+                    remoteInputHostVal = 
InetAddress.getLocalHost().getHostName();
+                }
+                final Boolean isSiteToSiteSecure = 
nifiProperties.isSiteToSiteSecure();
+                final Integer apiPort = isSiteToSiteSecure ? 
nifiProperties.getSslPort() : nifiProperties.getPort();
+                final NodeInformation self = new 
NodeInformation(remoteInputHostVal,
+                        nifiProperties.getRemoteInputPort(),
+                        nifiProperties.getRemoteInputHttpPort(),
+                        apiPort != null ? apiPort : 0, // Avoid potential 
NullPointerException.
+                        isSiteToSiteSecure, 0); // TotalFlowFiles doesn't 
matter if it's a standalone NiFi.
+
+                protocol.sendPeerList(peer, nodeInfo, self);
+                break;
+            case SHUTDOWN:
+                protocol.shutdown(peer);
+                break;
+        }
+    }
+
     private int getPort() {
         return socketPort;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
index 57bebda..f36cfaa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -21,6 +21,7 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.codec.StandardFlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -228,8 +229,7 @@ public class StandardHttpFlowFileServerProtocol extends 
AbstractFlowFileServerPr
     }
 
     @Override
-    public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> 
clusterNodeInfo, String remoteInputHost, Integer remoteInputPort, Integer 
remoteInputHttpPort,
-                             boolean isSiteToSiteSecure) throws IOException {
+    public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> 
clusterNodeInfo, final NodeInformation self) throws IOException {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 6c22ac7..a7c0212 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -19,7 +19,6 @@ package org.apache.nifi.remote.protocol.socket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -155,10 +154,7 @@ public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol
     public void sendPeerList(
             final Peer peer,
             final Optional<ClusterNodeInformation> clusterNodeInfo,
-            final String remoteInputHost,
-        final Integer remoteInputPort,
-        final Integer remoteInputHttpPort,
-            final boolean isSiteToSiteSecure) throws IOException {
+            final NodeInformation self) throws IOException {
         if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been 
completed");
         }
@@ -170,18 +166,12 @@ public class SocketFlowFileServerProtocol extends 
AbstractFlowFileServerProtocol
         final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
         final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
 
-        String remoteInputHostVal = remoteInputHost;
-        if (remoteInputHostVal == null) {
-            remoteInputHostVal = InetAddress.getLocalHost().getHostName();
-        }
         logger.debug("{} Advertising Remote Input host name {}", this, peer);
 
         List<NodeInformation> nodeInfos;
         if (clusterNodeInfo.isPresent()) {
             nodeInfos = new 
ArrayList<>(clusterNodeInfo.get().getNodeInformation());
         } else {
-            final NodeInformation self = new 
NodeInformation(remoteInputHostVal, remoteInputPort, remoteInputHttpPort, 
remoteInputHttpPort,
-                isSiteToSiteSecure, 0);
             nodeInfos = Collections.singletonList(self);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java
new file mode 100644
index 0000000..36cd550
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestSocketRemoteSiteListener.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestSocketRemoteSiteListener {
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+    }
+
+    @Test
+    public void testRequestPeerList() throws Exception {
+        Method method = 
SocketRemoteSiteListener.class.getDeclaredMethod("handleRequest",
+                ServerProtocol.class, Peer.class, RequestType.class);
+        method.setAccessible(true);
+
+        final NiFiProperties nifiProperties = spy(NiFiProperties.class);
+        final int apiPort = 8080;
+        final int remoteSocketPort = 8081;
+        final String remoteInputHost = "node1.example.com";
+        when(nifiProperties.getPort()).thenReturn(apiPort);
+        when(nifiProperties.getRemoteInputHost()).thenReturn(remoteInputHost);
+        when(nifiProperties.getRemoteInputPort()).thenReturn(remoteSocketPort);
+        when(nifiProperties.getRemoteInputHttpPort()).thenReturn(null); // 
Even if HTTP transport is disabled, RAW should work.
+        when(nifiProperties.isSiteToSiteHttpEnabled()).thenReturn(false);
+        when(nifiProperties.isSiteToSiteSecure()).thenReturn(false);
+        final SocketRemoteSiteListener listener = new 
SocketRemoteSiteListener(remoteSocketPort, null, nifiProperties);
+
+        final ServerProtocol serverProtocol = mock(ServerProtocol.class);
+        doAnswer(invocation -> {
+            final NodeInformation self = invocation.getArgumentAt(2, 
NodeInformation.class);
+            // Listener should inform about itself properly:
+            assertEquals(remoteInputHost, self.getSiteToSiteHostname());
+            assertEquals(remoteSocketPort, 
self.getSiteToSitePort().intValue());
+            assertNull(self.getSiteToSiteHttpApiPort());
+            assertEquals(apiPort, self.getAPIPort());
+            return null;
+        }).when(serverProtocol).sendPeerList(any(Peer.class), 
any(Optional.class), any(NodeInformation.class));
+
+        final Peer peer = null;
+        method.invoke(listener, serverProtocol, peer, 
RequestType.REQUEST_PEER_LIST);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0ddb90cd/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
new file mode 100644
index 0000000..77c39bc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/socket/TestSocketFlowFileServerProtocol.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
+import org.apache.nifi.remote.io.socket.SocketChannelInput;
+import org.apache.nifi.remote.io.socket.SocketChannelOutput;
+import org.apache.nifi.remote.protocol.HandshakeProperties;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestSocketFlowFileServerProtocol {
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
"src/test/resources/nifi.properties");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+    }
+
+    private Peer getDefaultPeer(final HandshakeProperties handshakeProperties, 
final OutputStream outputStream) throws IOException {
+        final PeerDescription description = new PeerDescription("peer-host", 
8080, false);
+
+        final byte[] inputBytes;
+        try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             final DataOutputStream dos = new DataOutputStream(bos)) {
+
+            dos.writeUTF(handshakeProperties.getCommsIdentifier());
+            dos.writeUTF(handshakeProperties.getTransitUriPrefix());
+            dos.writeInt(1); // num of properties
+            dos.writeUTF(HandshakeProperty.GZIP.name());
+            dos.writeUTF(String.valueOf(handshakeProperties.isUseGzip()));
+            dos.flush();
+
+            inputBytes = bos.toByteArray();
+        }
+
+        final InputStream inputStream = new ByteArrayInputStream(inputBytes);
+
+        final SocketChannelCommunicationsSession commsSession = 
mock(SocketChannelCommunicationsSession.class);
+        final SocketChannelInput channelInput = mock(SocketChannelInput.class);
+        final SocketChannelOutput channelOutput = 
mock(SocketChannelOutput.class);
+        when(commsSession.getInput()).thenReturn(channelInput);
+        when(commsSession.getOutput()).thenReturn(channelOutput);
+
+        when(channelInput.getInputStream()).thenReturn(inputStream);
+        when(channelOutput.getOutputStream()).thenReturn(outputStream);
+
+        final String peerUrl = "http://peer-host:8080/";;
+        final String clusterUrl = "cluster-url";
+        return new Peer(description, commsSession, peerUrl, clusterUrl);
+    }
+
+    private SocketFlowFileServerProtocol 
getDefaultSocketFlowFileServerProtocol() {
+        final StandardVersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
+        final SocketFlowFileServerProtocol protocol = spy(new 
SocketFlowFileServerProtocol());
+        return protocol;
+    }
+
+    @Test
+    public void testSendPeerListStandalone() throws Exception {
+        final SocketFlowFileServerProtocol protocol = 
getDefaultSocketFlowFileServerProtocol();
+        final Optional<ClusterNodeInformation> clusterNodeInfo = 
Optional.empty();
+        final String siteToSiteHostname = "node1.example.com";
+        final Integer siteToSitePort = 8081;
+        final Integer siteToSiteHttpPort = null;
+        final int apiPort = 8080;
+        final boolean isSiteToSiteSecure = true;
+        final int numOfQueuedFlowFiles = 100;
+        final NodeInformation self = new NodeInformation(siteToSiteHostname, 
siteToSitePort, siteToSiteHttpPort,
+                apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles);
+
+        final HandshakeProperties handshakeProperties = new 
HandshakeProperties();
+        handshakeProperties.setCommsIdentifier("communication-identifier");
+        handshakeProperties.setTransitUriPrefix("uri-prefix");
+
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final Peer peer = getDefaultPeer(handshakeProperties, outputStream);
+
+        protocol.handshake(peer);
+        protocol.sendPeerList(peer, clusterNodeInfo, self);
+
+        try (final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(outputStream.toByteArray()))) {
+            final Response handshakeResponse = Response.read(dis);
+            assertEquals(ResponseCode.PROPERTIES_OK, 
handshakeResponse.getCode());
+
+            final int numPeers = dis.readInt();
+            assertEquals(1, numPeers);
+
+            assertEquals(siteToSiteHostname, dis.readUTF());
+            assertEquals(siteToSitePort.intValue(), dis.readInt());
+            assertEquals(isSiteToSiteSecure, dis.readBoolean());
+            assertEquals(numOfQueuedFlowFiles, dis.readInt());
+
+        }
+    }
+
+    @Test
+    public void testSendPeerListCluster() throws Exception {
+        final SocketFlowFileServerProtocol protocol = 
getDefaultSocketFlowFileServerProtocol();
+        final List<NodeInformation> nodeInfoList = new ArrayList<>();
+        final ClusterNodeInformation clusterNodeInformation = new 
ClusterNodeInformation();
+        clusterNodeInformation.setNodeInformation(nodeInfoList);
+        final Optional<ClusterNodeInformation> clusterNodeInfo = 
Optional.of(clusterNodeInformation);
+
+        for (int i = 0; i < 3; i++) {
+            final String siteToSiteHostname = 
String.format("node%d.example.com", i);
+            final Integer siteToSitePort = 8081;
+            final Integer siteToSiteHttpPort = null;
+            final int apiPort = 8080;
+            final boolean isSiteToSiteSecure = true;
+            final int numOfQueuedFlowFiles = 100 + i;
+            final NodeInformation nodeInformation = new 
NodeInformation(siteToSiteHostname, siteToSitePort, siteToSiteHttpPort,
+                    apiPort, isSiteToSiteSecure, numOfQueuedFlowFiles);
+            nodeInfoList.add(nodeInformation);
+        }
+
+        final NodeInformation self = nodeInfoList.get(0);
+
+        final HandshakeProperties handshakeProperties = new 
HandshakeProperties();
+        handshakeProperties.setCommsIdentifier("communication-identifier");
+        handshakeProperties.setTransitUriPrefix("uri-prefix");
+
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        final Peer peer = getDefaultPeer(handshakeProperties, outputStream);
+
+        protocol.handshake(peer);
+        protocol.sendPeerList(peer, clusterNodeInfo, self);
+
+        try (final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(outputStream.toByteArray()))) {
+            final Response handshakeResponse = Response.read(dis);
+            assertEquals(ResponseCode.PROPERTIES_OK, 
handshakeResponse.getCode());
+
+            final int numPeers = dis.readInt();
+            assertEquals(nodeInfoList.size(), numPeers);
+
+            for (int i = 0; i < nodeInfoList.size(); i++) {
+                final NodeInformation node = nodeInfoList.get(i);
+                assertEquals(node.getSiteToSiteHostname(), dis.readUTF());
+                assertEquals(node.getSiteToSitePort().intValue(), 
dis.readInt());
+                assertEquals(node.isSiteToSiteSecure(), dis.readBoolean());
+                assertEquals(node.getTotalFlowFiles(), dis.readInt());
+            }
+        }
+    }
+
+}

Reply via email to