Repository: incubator-reef
Updated Branches:
  refs/heads/master 47fbccdaf -> e243535d7


[REEF-247]: Port ranges should to be configurable for listening

Defined a port range provider and a default implementation.

JIRA:
  [REEF-247](https://issues.apache.org/jira/browse/REEF-247)

Pull Request:
  This closes #157


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e243535d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e243535d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e243535d

Branch: refs/heads/master
Commit: e243535d762411134bfcc58b773713f25b994329
Parents: 47fbccd
Author: Beysim Sezgin <[email protected]>
Authored: Mon Apr 20 17:31:54 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Apr 23 10:33:38 2015 -0700

----------------------------------------------------------------------
 .../network/impl/MessagingTransportFactory.java |  3 +-
 .../local/client/LocalRuntimeConfiguration.java |  9 ++
 .../runtime/mesos/util/MesosRemoteManager.java  |  3 +-
 .../yarn/client/YarnClientConfiguration.java    |  9 ++
 lang/java/reef-wake/wake/pom.xml                |  4 +
 .../DefaultRemoteManagerImplementation.java     | 18 ++--
 .../wake/remote/ports/RandomRangeIterator.java  | 63 +++++++++++++
 .../wake/remote/ports/RangeTcpPortProvider.java | 90 ++++++++++++++++++
 .../reef/wake/remote/ports/TcpPortProvider.java | 46 ++++++++++
 .../ports/parameters/TcpPortRangeBegin.java     | 30 ++++++
 .../ports/parameters/TcpPortRangeCount.java     | 30 ++++++
 .../ports/parameters/TcpPortRangeTryCount.java  | 30 ++++++
 .../netty/NettyMessagingTransport.java          | 96 ++++++++++++--------
 .../remote/RemoteIdentifierFactoryTest.java     |  3 +-
 .../wake/test/remote/RemoteManagerTest.java     | 16 +++-
 .../reef/wake/test/remote/TestRemote.java       |  4 +-
 16 files changed, 400 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
index 82fd720..f9391bd 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
@@ -24,6 +24,7 @@ import org.apache.reef.wake.impl.SyncStage;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 
@@ -68,7 +69,7 @@ public class MessagingTransportFactory implements 
TransportFactory {
                           final EventHandler<Exception> exHandler) {
 
     final Transport transport = new NettyMessagingTransport(this.localAddress,
-        port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 
3, 10000);
+        port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 
3, 10000, RangeTcpPortProvider.Default);
 
     transport.registerErrorHandler(exHandler);
     return transport;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
index cbcdf66..accb6b0 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java
@@ -32,6 +32,7 @@ import 
org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalImpl;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
 
 import java.util.concurrent.ExecutorService;
 
@@ -74,6 +75,13 @@ public class LocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
   public static final OptionalImpl<LocalAddressProvider> 
LOCAL_ADDRESS_PROVIDER = new OptionalImpl<>();
 
   /**
+   * The class used to restrict tcp port ranges for listening
+   * Note that you will likely want to bind the same class also to 
DRIVER_CONFIGURATION_PROVIDERS to make sure that
+   * the Driver (and the Evaluators) also use it.
+   */
+  public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new 
OptionalImpl<>();
+
+  /**
    * The ConfigurationModule for the local resourcemanager.
    */
   public static final ConfigurationModule CONF = new 
LocalRuntimeConfiguration()
@@ -89,6 +97,7 @@ public class LocalRuntimeConfiguration extends 
ConfigurationModuleBuilder {
       .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
           // Bind LocalAddressProvider
       .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER)
+      .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER)
       .build();
 
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
index 3c3bfd9..9a4f974 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
@@ -24,6 +24,7 @@ import org.apache.reef.wake.remote.RemoteIdentifierFactory;
 import org.apache.reef.wake.remote.RemoteManager;
 import org.apache.reef.wake.remote.RemoteMessage;
 import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 
 import javax.inject.Inject;
 
@@ -43,7 +44,7 @@ public final class MesosRemoteManager {
                      final LocalAddressProvider localAddressProvider) {
     this.factory = factory;
     this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", 
"##UNKNOWN##", 0,
-        codec, mesosErrorHandler, false, 3, 10000, localAddressProvider);
+        codec, mesosErrorHandler, false, 3, 10000, localAddressProvider, 
RangeTcpPortProvider.Default);
   }
 
   public <T> EventHandler<T> getHandler(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
index 5b46da6..115484b 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java
@@ -36,6 +36,7 @@ import org.apache.reef.tang.formats.OptionalImpl;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.util.logging.LoggingSetup;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
 
 /**
  * A ConfigurationModule for the YARN resourcemanager.
@@ -58,6 +59,13 @@ public class YarnClientConfiguration extends 
ConfigurationModuleBuilder {
   public static final OptionalImpl<ConfigurationProvider> 
DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>();
 
   /**
+   * The class used to restrict tcp port ranges for listening
+   * Note that you will likely want to bind the same class also to 
DRIVER_CONFIGURATION_PROVIDERS to make sure that
+   * the Driver (and the Evaluators) also use it.
+   */
+  public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new 
OptionalImpl<>();
+
+  /**
    * The class used to resolve the local address for Wake and HTTP to bind to.
    * Note that you will likely want to bind the same class also to 
DRIVER_CONFIGURATION_PROVIDERS to make sure that
    * the Driver (and the Evaluators) also use it.
@@ -78,6 +86,7 @@ public class YarnClientConfiguration extends 
ConfigurationModuleBuilder {
       .bindSetEntry(DriverConfigurationProviders.class, 
DRIVER_CONFIGURATION_PROVIDERS)
           // Bind LocalAddressProvider
       .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER)
+      .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER)
       .build();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/pom.xml b/lang/java/reef-wake/wake/pom.xml
index 75fc953..bb762ec 100644
--- a/lang/java/reef-wake/wake/pom.xml
+++ b/lang/java/reef-wake/wake/pom.xml
@@ -118,6 +118,10 @@ under the License.
             <groupId>${project.groupId}</groupId>
             <artifactId>tang</artifactId>
         </dependency>
+        <dependency>
+            <groupId>net.jcip</groupId>
+            <artifactId>jcip-annotations</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index ff1679c..f5d7a9c 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -25,6 +25,8 @@ import org.apache.reef.wake.impl.StageManager;
 import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 
@@ -82,7 +84,8 @@ public class DefaultRemoteManagerImplementation implements 
RemoteManager {
         orderingGuarantee,
         numberOfTries,
         retryTimeout,
-        LocalAddressProviderFactory.getInstance());
+        LocalAddressProviderFactory.getInstance(),
+        RangeTcpPortProvider.Default);
 
   }
 
@@ -100,7 +103,8 @@ public class DefaultRemoteManagerImplementation implements 
RemoteManager {
       final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean 
orderingGuarantee,
       final @Parameter(RemoteConfiguration.NumberOfTries.class) int 
numberOfTries,
       final @Parameter(RemoteConfiguration.RetryTimeout.class) int 
retryTimeout,
-      final LocalAddressProvider localAddressProvider) {
+      final LocalAddressProvider localAddressProvider,
+      final TcpPortProvider tcpPortProvider) {
 
     this.name = name;
     this.handlerContainer = new HandlerContainer<>(name, codec);
@@ -109,13 +113,9 @@ public class DefaultRemoteManagerImplementation implements 
RemoteManager {
         new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
         new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
 
-    if ("##UNKNOWN##".equals(hostAddress)) {
-      this.transport = new NettyMessagingTransport(
-          localAddressProvider.getLocalAddress(), listeningPort, 
this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout);
-    } else {
-      this.transport = new NettyMessagingTransport(
-          hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, 
numberOfTries, retryTimeout);
-    }
+    final String host = "##UNKNOWN##".equals(hostAddress) ? 
localAddressProvider.getLocalAddress() : hostAddress;
+    this.transport = new NettyMessagingTransport(
+            host, listeningPort, this.reRecvStage, this.reRecvStage, 
numberOfTries, retryTimeout, tcpPortProvider);
 
     this.handlerContainer.setTransport(this.transport);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
new file mode 100644
index 0000000..dbb47c4
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * This class will give out random port numbers between tcpPortRangeBegin and 
tcpPortRangeBegin+tcpPortRangeCount
+ * Max number of ports given is tryCount
+ */
+@ThreadSafe
+final class RandomRangeIterator implements Iterator<Integer> {
+  private final int tcpPortRangeBegin;
+  private final int tcpPortRangeCount;
+  private final int tryCount;
+  private int currentRetryCount;
+  private final Random random = new Random(System.currentTimeMillis());
+
+  RandomRangeIterator(final int tcpPortRangeBegin, final int 
tcpPortRangeCount, int tryCount) {
+    this.tcpPortRangeBegin = tcpPortRangeBegin;
+    this.tcpPortRangeCount = tcpPortRangeCount;
+    this.tryCount = tryCount;
+  }
+
+  @Override
+  public synchronized boolean hasNext() {
+    return currentRetryCount++ < tryCount;
+  }
+
+  @Override
+  public synchronized Integer next() {
+    return random.nextInt(tcpPortRangeCount) + tcpPortRangeBegin;
+  }
+
+  /**
+   * always throws
+   * @throws UnsupportedOperationException always.
+   */
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException ();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
new file mode 100644
index 0000000..64b8de4
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports;
+
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.ConfigurationProvider;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount;
+
+import javax.inject.Inject;
+import java.util.Iterator;
+import java.util.logging.Logger;
+
+/**
+ * A TcpPortProvider which gives out random ports in a range
+ */
+public final class RangeTcpPortProvider implements TcpPortProvider, 
ConfigurationProvider {
+  private final int portRangeBegin;
+  private final int portRangeCount;
+  private final int portRangeTryCount;
+  private static final Logger LOG = 
Logger.getLogger(RangeTcpPortProvider.class.getName());
+
+  @Inject
+  public RangeTcpPortProvider(final @Parameter(TcpPortRangeBegin.class) int 
portRangeBegin,
+                              final @Parameter(TcpPortRangeCount.class) int 
portRangeCount,
+                              final @Parameter(TcpPortRangeTryCount.class) int 
portRangeTryCount) {
+    this.portRangeBegin = portRangeBegin;
+    this.portRangeCount = portRangeCount;
+    this.portRangeTryCount = portRangeTryCount;
+  }
+
+  /**
+   * Returns an iterator over a set of tcp ports
+   *
+   * @return an Iterator.
+   */
+  @Override
+  public Iterator<Integer> iterator() {
+    return new RandomRangeIterator(portRangeBegin, portRangeCount, 
portRangeTryCount);
+  }
+
+  /**
+   * @deprecated have an instance injected instead.
+   */
+  @Deprecated
+  public static final RangeTcpPortProvider Default = new RangeTcpPortProvider(
+      Integer.parseInt(TcpPortRangeBegin.default_value),
+      Integer.parseInt(TcpPortRangeCount.default_value),
+          Integer.parseInt(TcpPortRangeTryCount.default_value));
+
+
+  @Override
+  public Configuration getConfiguration() {
+    return Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(TcpPortRangeBegin.class, 
String.valueOf(portRangeBegin))
+        .bindNamedParameter(TcpPortRangeCount.class, 
String.valueOf(portRangeCount))
+        .bindNamedParameter(TcpPortRangeTryCount.class, 
String.valueOf(portRangeTryCount))
+        .bindImplementation(TcpPortProvider.class, RangeTcpPortProvider.class)
+        .build();
+  }
+
+  @Override
+  public String toString() {
+    return "RangeTcpPortProvider{" +
+        "portRangeBegin=" + portRangeBegin +
+        ", portRangeCount=" + portRangeCount +
+        ", portRangeTryCount=" + portRangeTryCount +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java
new file mode 100644
index 0000000..566b62b
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+import java.util.Iterator;
+
+/**
+ * Provides an iterator that returns port numbers.
+*/
+@DefaultImplementation(RangeTcpPortProvider.class)
+public interface TcpPortProvider extends Iterable<Integer> {
+  /**
+   * Returns an iterator over a set of tcp ports
+   *
+   * @return an Iterator.
+   */
+  @Override
+  Iterator<Integer> iterator();
+
+  /**
+   * returns a configuration for the class that implements TcpPortProvider so 
that class can be instantiated
+   * somewhere else
+   *
+   * @return Configuration.
+   */
+  Configuration getConfiguration();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
new file mode 100644
index 0000000..83bf9a1
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * First tcp port number to try
+ */
+@NamedParameter(doc = "First tcp port number to try", default_value = 
TcpPortRangeBegin.default_value)
+public class TcpPortRangeBegin implements Name<Integer> {
+  public static final String default_value = "10000";
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
new file mode 100644
index 0000000..e5727e2
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Number of tcp ports in the range
+ */
+@NamedParameter(doc = "Number of tcp ports in the range", default_value = 
TcpPortRangeCount.default_value)
+public class TcpPortRangeCount implements Name<Integer> {
+  public static final String default_value = "10000";
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
new file mode 100644
index 0000000..7aaa4af
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.ports.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Max number tries for port numbers
+ */
+@NamedParameter(doc = "Max number tries for port numbers", default_value = 
TcpPortRangeTryCount.default_value)
+public class TcpPortRangeTryCount implements Name<Integer> {
+  public static final String default_value = "1000";
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index f9999ae..a1f0886 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -30,30 +30,31 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.DefaultThreadFactory;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import 
org.apache.reef.wake.remote.transport.exception.TransportRuntimeException;
 
 import java.io.IOException;
 import java.net.BindException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Random;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.reef.wake.EStage;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.impl.DefaultThreadFactory;
-import org.apache.reef.wake.remote.Encoder;
-import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
-import org.apache.reef.wake.remote.impl.TransportEvent;
-import org.apache.reef.wake.remote.transport.Link;
-import org.apache.reef.wake.remote.transport.LinkListener;
-import org.apache.reef.wake.remote.transport.Transport;
-import 
org.apache.reef.wake.remote.transport.exception.TransportRuntimeException;
-
 /**
  * Messaging transport implementation with Netty
  */
@@ -65,9 +66,6 @@ public class NettyMessagingTransport implements Transport {
   private static final int SERVER_BOSS_NUM_THREADS = 3;
   private static final int SERVER_WORKER_NUM_THREADS = 20;
   private static final int CLIENT_WORKER_NUM_THREADS = 10;
-  private static final int PORT_START = 10000;
-  private static final int PORT_RANGE = 10000;
-  private static final Random randPort = new Random();
 
   private final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap = 
new ConcurrentHashMap<>();
 
@@ -100,12 +98,15 @@ public class NettyMessagingTransport implements Transport {
    * @param serverStage   the server-side stage that handles transport events
    * @param numberOfTries the number of tries of connection
    * @param retryTimeout  the timeout of reconnection
+   * @param tcpPortProvider  gives an iterator that produces random tcp ports 
in a range
+   *
    */
   public NettyMessagingTransport(final String hostAddress, int port,
                                  final EStage<TransportEvent> clientStage,
                                  final EStage<TransportEvent> serverStage,
                                  final int numberOfTries,
-                                 final int retryTimeout) {
+                                 final int retryTimeout,
+                                 final TcpPortProvider tcpPortProvider) {
 
     if (port < 0) {
       throw new RemoteRuntimeException("Invalid server port: " + port);
@@ -139,29 +140,31 @@ public class NettyMessagingTransport implements Transport 
{
 
     LOG.log(Level.FINE, "Binding to {0}", port);
 
-    Channel acceptor = null;
-    try {
-      if (port > 0) {
-        acceptor = this.serverBootstrap.bind(new 
InetSocketAddress(hostAddress, port)).sync().channel();
-      } else {
-        while (acceptor == null) {
-          port = randPort.nextInt(PORT_START) + PORT_RANGE;
-          LOG.log(Level.FINEST, "Try port {0}", port);
-          try {
-            acceptor = this.serverBootstrap.bind(new 
InetSocketAddress(hostAddress, port)).sync().channel();
-          } catch (final Exception ex) {
-            if (ex instanceof BindException) {
-              LOG.log(Level.FINEST, "The port {0} is already bound. Try 
again", port);
-            } else {
-              throw ex;
-            }
+  Channel acceptor = null;
+  try {
+    if (port > 0) {
+      acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, 
port)).sync().channel();
+    } else {
+      Iterator<Integer> ports = tcpPortProvider.iterator();
+      while (acceptor == null) {
+        if (!ports.hasNext()) break;
+        port = ports.next();
+        LOG.log(Level.FINEST, "Try port {0}", port);
+        try {
+          acceptor = this.serverBootstrap.bind(new 
InetSocketAddress(hostAddress, port)).sync().channel();
+        } catch (final Exception ex) {
+          if (ex instanceof BindException) {
+            LOG.log(Level.FINEST, "The port {0} is already bound. Try again", 
port);
+          } else {
+            throw ex;
           }
         }
       }
-    } catch (final Exception ex) {
-      final RuntimeException transportException =
-          new TransportRuntimeException("Cannot bind to port " + port);
-      LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex);
+    }
+  } catch (final Exception ex) {
+    final RuntimeException transportException =
+       new TransportRuntimeException("Cannot bind to port " + port);
+    LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex);
 
       this.clientWorkerGroup.shutdownGracefully();
       this.serverBossGroup.shutdownGracefully();
@@ -177,6 +180,27 @@ public class NettyMessagingTransport implements Transport {
   }
 
   /**
+   * Constructs a messaging transport
+   *
+   * @param hostAddress   the server host address
+   * @param port          the server listening port; when it is 0, randomly 
assign a port number
+   * @param clientStage   the client-side stage that handles transport events
+   * @param serverStage   the server-side stage that handles transport events
+   * @param numberOfTries the number of tries of connection
+   * @param retryTimeout  the timeout of reconnection
+   * @deprecated use the constructor that takes a TcpProvider instead
+   */
+  @Deprecated
+  public NettyMessagingTransport(final String hostAddress, int port,
+                                 final EStage<TransportEvent> clientStage,
+                                 final EStage<TransportEvent> serverStage,
+                                 final int numberOfTries,
+                                 final int retryTimeout) {
+    this(hostAddress, port, clientStage, serverStage, numberOfTries, 
retryTimeout,
+            RangeTcpPortProvider.Default);
+  }
+
+  /**
    * Closes all channels and releases all resources
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
index c003cb5..48f2c25 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
@@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
 import org.apache.reef.wake.remote.impl.MultiCodec;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -70,7 +71,7 @@ public class RemoteIdentifierFactoryTest {
 
     try (final RemoteManager rm = new 
DefaultRemoteManagerImplementation("TestRemoteManager",
         localAddressProvider.getLocalAddress(), port, codec, new 
LoggingEventHandler<Throwable>(), false, 1, 10000,
-        localAddressProvider)) {
+        localAddressProvider, RangeTcpPortProvider.Default)) {
       final RemoteIdentifier id = rm.getMyIdentifier();
 
       final IdentifierFactory factory = new DefaultIdentifierFactory();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
index 888574f..ad17964 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementa
 import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
 import org.apache.reef.wake.remote.impl.MultiCodec;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 import org.apache.reef.wake.test.util.Monitor;
 import org.apache.reef.wake.test.util.TimeoutHandler;
 import org.junit.Assert;
@@ -80,7 +81,8 @@ public class RemoteManagerTest {
     final String hostAddress = localAddressProvider.getLocalAddress();
 
     final RemoteManager rm = new DefaultRemoteManagerImplementation(
-        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider);
+        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000,
+            localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -182,7 +184,8 @@ public class RemoteManagerTest {
     final String hostAddress = localAddressProvider.getLocalAddress();
 
     final RemoteManager rm = new DefaultRemoteManagerImplementation(
-        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), true, 3, 10000, localAddressProvider);
+        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), true, 3, 10000,
+            localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -225,7 +228,8 @@ public class RemoteManagerTest {
     String hostAddress = localAddressProvider.getLocalAddress();
 
     final RemoteManager rm = new DefaultRemoteManagerImplementation(
-        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider);
+        "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000,
+            localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -264,7 +268,8 @@ public class RemoteManagerTest {
     ExceptionHandler errorHandler = new ExceptionHandler(monitor);
 
     try (final RemoteManager rm = new DefaultRemoteManagerImplementation(
-        "name", hostAddress, port, codec, errorHandler, false, 3, 10000, 
localAddressProvider)) {
+        "name", hostAddress, port, codec, errorHandler, false, 3, 10000, 
localAddressProvider,
+            RangeTcpPortProvider.Default)) {
 
       RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
       RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -292,7 +297,8 @@ public class RemoteManagerTest {
 
     String hostAddress = localAddressProvider.getLocalAddress();
     return new DefaultRemoteManagerImplementation(name, hostAddress, localPort,
-        codec, new LoggingEventHandler<Throwable>(), false, retry, 
retryTimeout, localAddressProvider);
+        codec, new LoggingEventHandler<Throwable>(), false, retry, 
retryTimeout,
+            localAddressProvider, RangeTcpPortProvider.Default);
   }
 
   private class SendingRemoteManagerThread implements Callable<Integer> {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
index e12a517..3280bd6 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
@@ -27,6 +27,7 @@ import 
org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import 
org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation;
 import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 
 import javax.inject.Inject;
 import java.net.UnknownHostException;
@@ -46,7 +47,8 @@ public class TestRemote implements Runnable {
     int remotePort = 10001;
     Codec<TestEvent> codec = new TestEventCodec();
     try (RemoteManager rm = new DefaultRemoteManagerImplementation("name", 
hostAddress,
-        myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, 
localAddressProvider)) {
+        myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, 
localAddressProvider,
+            RangeTcpPortProvider.Default)) {
       // proxy handler
       RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
       RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + remotePort);

Reply via email to