Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client fdf758460 -> a6293e340
NIFI-282: Continue separating site-to-site functionality into utility Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a6293e34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a6293e34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a6293e34 Branch: refs/heads/site-to-site-client Commit: a6293e34086cf449dc11be9105fe0794302b8c5a Parents: fdf7584 Author: Mark Payne <[email protected]> Authored: Mon Jan 19 08:31:34 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Jan 19 08:31:34 2015 -0500 ---------------------------------------------------------------------- .../socket/EndpointConnectionStatePool.java | 4 - .../protocol/socket/SocketClientProtocol.java | 3 +- .../socket/TestEndpointConnectionStatePool.java | 95 +++++++++++++++++++ .../apache/nifi/groups/RemoteProcessGroup.java | 3 + .../nifi/remote/StandardRemoteProcessGroup.java | 14 ++- .../nifi/remote/SocketRemoteSiteListener.java | 2 +- .../nifi/remote/StandardRemoteGroupPort.java | 41 +-------- .../remote/TestStandardRemoteGroupPort.java | 97 -------------------- 8 files changed, 110 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java index 2dd489d..d20fb58 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -518,10 +518,6 @@ public class EndpointConnectionStatePool { } -// private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException { -// return formulateDestinationList(clusterNodeInfo, getConnectableType()); -// } - static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); final int numDestinations = Math.max(128, nodeInfoSet.size()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 2f4f755..560385c 100644 --- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -59,7 +59,6 @@ import org.slf4j.LoggerFactory; public class SocketClientProtocol implements ClientProtocol { private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); - private RemoteDestination destination; private boolean useCompression; @@ -105,7 +104,7 @@ public class SocketClientProtocol implements ClientProtocol { properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); final CommunicationsSession commsSession = peer.getCommunicationsSession(); - commsSession.setTimeout((int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + commsSession.setTimeout(timeoutMillis); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java new file mode 100644 index 0000000..d8899ea --- /dev/null +++ b/nifi/commons/site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.junit.Test; + +public class TestEndpointConnectionStatePool { + + @Test + public void testFormulateDestinationListForOutput() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + @Test + public void testFormulateDestinationListForOutputHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + + + + @Test + public void testFormulateDestinationListForInputPorts() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + @Test + public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 2e35422..9f2dac8 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; public interface RemoteProcessGroup { @@ -80,6 +81,8 @@ public interface RemoteProcessGroup { void setYieldDuration(final String yieldDuration); String getYieldDuration(); + + EndpointConnectionStatePool getConnectionPool(); /** * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min") http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index bfa3d25..857add9 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -175,6 +175,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } }; + endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile()); + final Runnable socketCleanup = new Runnable() { @Override public void run() { @@ -187,14 +189,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { readLock.unlock(); } - for (final StandardRemoteGroupPort port : ports) { - port.cleanupSockets(); - } + endpointConnectionPool.cleanupExpiredSockets(); } }; - endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile()); - final Runnable refreshPeers = new Runnable() { @Override public void run() { @@ -240,6 +238,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void shutdown() { backgroundThreadExecutor.shutdown(); + endpointConnectionPool.shutdown(); } @Override @@ -1292,6 +1291,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public String getYieldDuration() { return yieldDuration; } + + @Override + public EndpointConnectionStatePool getConnectionPool() { + return endpointConnectionPool; + } @Override public void verifyCanDelete() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index cb2d76d..f053e65 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -198,7 +198,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { protocol.setRootProcessGroup(rootGroup.get()); protocol.setNodeInformant(nodeInformant); - peer = new Peer(commsSession, peerUri); + peer = new Peer(commsSession, peerUri, "nifi://localhost:" + getPort()); LOG.debug("Handshaking...."); protocol.handshake(peer); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 53f998e..77ac1a9 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -16,34 +16,18 @@ */ package org.apache.nifi.remote; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; -import java.net.URISyntaxException; -import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLContext; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; @@ -57,29 +41,20 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.remote.client.socket.EndpointConnectionState; import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.exception.UnknownPortException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; -import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sun.jersey.api.client.ClientHandlerException; - public class StandardRemoteGroupPort extends RemoteGroupPort { public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String CONTENT_TYPE = "application/octet-stream"; @@ -112,14 +87,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { this.transferDirection = direction; setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); - final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory(); - final File persistenceFile = new File(stateDir, remoteGroup.getIdentifier() + ".peers"); - - // TODO: This should really be constructed in the RemoteProcessGroup and made available to all ports in - // that remote process group. This prevents too many connections from being made and also protects the persistenceFile - // so that only a single thread will ever attempt to write to the file at once. - FIXME(); - connectionStatePool = new EndpointConnectionStatePool(sslContext, remoteGroup.getEventReporter(), persistenceFile); + connectionStatePool = remoteGroup.getConnectionPool(); } @Override @@ -145,8 +113,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } finally { interruptLock.unlock(); } - - connectionStatePool.shutdown(); } @Override @@ -162,11 +128,6 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { } - void cleanupSockets() { - connectionStatePool.cleanupExpiredSockets(); - } - - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { if ( !remoteGroup.isTransmitting() ) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a6293e34/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java deleted file mode 100644 index 7474d38..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import org.apache.nifi.remote.StandardRemoteGroupPort; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformation; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.nifi.connectable.ConnectableType; -import org.junit.Test; - -public class TestStandardRemoteGroupPort { - - @Test - public void testFormulateDestinationListForOutput() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); - collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240)); - collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024)); - collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); - collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - @Test - public void testFormulateDestinationListForOutputHugeDifference() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500)); - collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - - - - @Test - public void testFormulateDestinationListForInputPorts() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); - collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240)); - collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024)); - collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); - collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - @Test - public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500)); - collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } -}
