Repository: incubator-reef
Updated Branches:
  refs/heads/master 19fdfcc65 -> 637f9fcaa


[REEF-283] Introduce RemoteManagerFactory

This change adds the new interface `RemoteManagerFactory` together with
its default implementation `DefaultRemoteManagerFactory`. The factory is
injectable, yet offers all the constructors of `DefaultRemoteManager`
through its methods. This allows us to mix and match between Tang
Configuration and ad-hoc overrides.

JIRA:
  [REEF-283](https://issues.apache.org/jira/browse/REEF-283)

Pull Request:
  Closes #164


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/637f9fca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/637f9fca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/637f9fca

Branch: refs/heads/master
Commit: 637f9fcaa33ed18ddeaa1e9830c0f2f5a3df8cb3
Parents: 19fdfcc
Author: Markus Weimer <[email protected]>
Authored: Mon Apr 27 13:19:52 2015 -0700
Committer: Brian Cho <[email protected]>
Committed: Thu Apr 30 18:34:30 2015 +0900

----------------------------------------------------------------------
 .../runtime/mesos/util/MesosRemoteManager.java  |  11 +-
 .../reef/wake/remote/RemoteManagerFactory.java  | 116 ++++++++++++++
 .../impl/DefaultRemoteManagerFactory.java       | 153 +++++++++++++++++++
 .../DefaultRemoteManagerImplementation.java     |   8 +-
 .../remote/RemoteIdentifierFactoryTest.java     |  18 +--
 .../wake/test/remote/RemoteManagerTest.java     |  30 ++--
 .../reef/wake/test/remote/TestRemote.java       |  14 +-
 7 files changed, 309 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
index 9a4f974..20effe0 100644
--- 
a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
+++ 
b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java
@@ -19,12 +19,10 @@
 package org.apache.reef.runtime.mesos.util;
 
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.RemoteIdentifierFactory;
 import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteManagerFactory;
 import org.apache.reef.wake.remote.RemoteMessage;
-import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 
 import javax.inject.Inject;
 
@@ -41,10 +39,11 @@ public final class MesosRemoteManager {
   MesosRemoteManager(final RemoteIdentifierFactory factory,
                      final MesosErrorHandler mesosErrorHandler,
                      final MesosRemoteManagerCodec codec,
-                     final LocalAddressProvider localAddressProvider) {
+                     final RemoteManagerFactory remoteManagerFactory) {
     this.factory = factory;
-    this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", 
"##UNKNOWN##", 0,
-        codec, mesosErrorHandler, false, 3, 10000, localAddressProvider, 
RangeTcpPortProvider.Default);
+    this.raw = remoteManagerFactory.getInstance("MESOS_EXECUTOR", codec, 
mesosErrorHandler);
+
+
   }
 
   public <T> EventHandler<T> getHandler(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
new file mode 100644
index 0000000..4c741de
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.impl.DefaultRemoteManagerFactory;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+
+/**
+ * Injectable Factory for RemoteManager instances.
+ * <p/>
+ * Use when direct injection of the RemoteManager is impossible.
+ */
+@DefaultImplementation(DefaultRemoteManagerFactory.class)
+public interface RemoteManagerFactory {
+
+  /**
+   * @param name the name of used by the returned RemoteManager to determine 
the address to bind to. to instantiate.
+   * @return a new instance of RemoteManager with all parameters but the given 
one injected via Tang.
+   */
+  RemoteManager getInstance(final String name);
+
+  /**
+   * @param name         the name of the returned RemoteManager to instantiate.
+   * @param codec        the codec to use to decode the messages sent to / by 
this RemoteManager.
+   * @param errorHandler the error handler invoked for exceptions by the 
returned RemoteManager.
+   * @param <T>          the message type sent / received by the returned 
RemoteManager.
+   * @return a new instance of RemoteManager with all parameters but the given 
one injected via Tang.
+   */
+  <T> RemoteManager getInstance(final String name,
+                                final Codec<T> codec,
+                                final EventHandler<Throwable> errorHandler);
+
+  /**
+   * @param name          the name of the returned RemoteManager to 
instantiate.
+   * @param listeningPort the port on which the returned RemoteManager listens.
+   * @param codec         the codec to use to decode the messages sent to / by 
this RemoteManager.
+   * @param errorHandler  the error handler invoked for exceptions by the 
returned RemoteManager.
+   * @param <T>           the message type sent / received by the returned 
RemoteManager.
+   * @return a new instance of RemoteManager with all parameters but the given 
one injected via Tang.
+   */
+  <T> RemoteManager getInstance(final String name,
+                                final int listeningPort,
+                                final Codec<T> codec,
+                                final EventHandler<Throwable> errorHandler);
+
+  /**
+   * The old constructor of DefaultRemoteManagerImplementation. Avoid if you 
can.
+   *
+   * @param name              the name of the returned RemoteManager to 
instantiate.
+   * @param hostAddress       the address the returned RemoteManager binds to.
+   * @param listeningPort     the port on which the returned RemoteManager 
listens.
+   * @param codec             the codec to use to decode the messages sent to 
/ by this RemoteManager.
+   * @param errorHandler      the error handler invoked for exceptions by the 
returned RemoteManager.
+   * @param orderingGuarantee whether or not the returned RemoteManager should 
guarantee message orders.
+   * @param numberOfTries     the number of retries before the returned 
RemoteManager declares sending a failure.
+   * @param retryTimeout      the time (in ms) after which the returned 
RemoteManager considers a sending attempt failed.
+   * @param <T>               the message type sent / received by the returned 
RemoteManager.
+   * @return a new instance of RemoteManager with all parameters but the given 
one injected via Tang.
+   */
+  <T> RemoteManager getInstance(final String name,
+                                final String hostAddress,
+                                final int listeningPort,
+                                final Codec<T> codec,
+                                final EventHandler<Throwable> errorHandler,
+                                final boolean orderingGuarantee,
+                                final int numberOfTries,
+                                final int retryTimeout);
+
+  /**
+   * The all-out constructor of DefaultRemoteManagerImplementation. Avoid if 
you can.
+   *
+   * @param name                 the name of the returned RemoteManager to 
instantiate.
+   * @param hostAddress          the address the returned RemoteManager binds 
to.
+   * @param listeningPort        the port on which the returned RemoteManager 
listens.
+   * @param codec                the codec to use to decode the messages sent 
to / by this RemoteManager.
+   * @param errorHandler         the error handler invoked for exceptions by 
the returned RemoteManager.
+   * @param orderingGuarantee    whether or not the returned RemoteManager 
should guarantee message orders.
+   * @param numberOfTries        the number of retries before the returned 
RemoteManager declares sending a failure.
+   * @param retryTimeout         the time (in ms) after which the returned 
RemoteManager considers a sending attempt failed.
+   * @param localAddressProvider the LocalAddressProvider used by the returned 
RemoteManager to determine the address to bind to.
+   * @param tcpPortProvider      the TcpPortProvider used by the returned 
RemoteManager to determine the port to listen to.
+   * @param <T>                  the message type sent / received by the 
returned RemoteManager.
+   * @return a new instance of RemoteManager with all parameters but the given 
one injected via Tang.
+   */
+  <T> RemoteManager getInstance(final String name,
+                                final String hostAddress,
+                                final int listeningPort,
+                                final Codec<T> codec,
+                                final EventHandler<Throwable> errorHandler,
+                                final boolean orderingGuarantee,
+                                final int numberOfTries,
+                                final int retryTimeout,
+                                final LocalAddressProvider 
localAddressProvider,
+                                final TcpPortProvider tcpPortProvider);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
new file mode 100644
index 0000000..e36b4dc
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteManagerFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+
+import javax.inject.Inject;
+
+/**
+ * Default implementation of RemoteManagerFactory.
+ */
+public class DefaultRemoteManagerFactory implements RemoteManagerFactory {
+
+  private final Codec<?> codec;
+  private final EventHandler<Throwable> errorHandler;
+  private final boolean orderingGuarantee;
+  private final int numberOfTries;
+  private final int retryTimeout;
+  private final LocalAddressProvider localAddressProvider;
+  private final TcpPortProvider tcpPortProvider;
+
+  @Inject
+  private DefaultRemoteManagerFactory(
+      final @Parameter(RemoteConfiguration.MessageCodec.class) Codec<?> codec,
+      final @Parameter(RemoteConfiguration.ErrorHandler.class) 
EventHandler<Throwable> errorHandler,
+      final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean 
orderingGuarantee,
+      final @Parameter(RemoteConfiguration.NumberOfTries.class) int 
numberOfTries,
+      final @Parameter(RemoteConfiguration.RetryTimeout.class) int 
retryTimeout,
+      final LocalAddressProvider localAddressProvider,
+      final TcpPortProvider tcpPortProvider) {
+    this.codec = codec;
+    this.errorHandler = errorHandler;
+    this.orderingGuarantee = orderingGuarantee;
+    this.numberOfTries = numberOfTries;
+    this.retryTimeout = retryTimeout;
+    this.localAddressProvider = localAddressProvider;
+    this.tcpPortProvider = tcpPortProvider;
+  }
+
+  @Override
+  public RemoteManager getInstance(final String name) {
+    return new DefaultRemoteManagerImplementation(name,
+        DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to 
use the localAddressProvider
+        0, // Indicate to use the tcpPortProvider
+        this.codec,
+        this.errorHandler,
+        this.orderingGuarantee,
+        this.numberOfTries,
+        this.retryTimeout,
+        this.localAddressProvider,
+        this.tcpPortProvider);
+  }
+
+
+  @Override
+  public <T> RemoteManager getInstance(final String name,
+                                       final String hostAddress,
+                                       final int listeningPort,
+                                       final Codec<T> codec,
+                                       final EventHandler<Throwable> 
errorHandler,
+                                       final boolean orderingGuarantee,
+                                       final int numberOfTries,
+                                       final int retryTimeout,
+                                       final LocalAddressProvider 
localAddressProvider,
+                                       final TcpPortProvider tcpPortProvider) {
+    return new DefaultRemoteManagerImplementation(name,
+        hostAddress,
+        listeningPort,
+        codec,
+        errorHandler,
+        orderingGuarantee,
+        numberOfTries,
+        retryTimeout,
+        localAddressProvider,
+        tcpPortProvider);
+  }
+
+  @Override
+  public <T> RemoteManager getInstance(final String name,
+                                       final String hostAddress,
+                                       final int listeningPort,
+                                       final Codec<T> codec,
+                                       final EventHandler<Throwable> 
errorHandler,
+                                       final boolean orderingGuarantee,
+                                       final int numberOfTries,
+                                       final int retryTimeout) {
+    return new DefaultRemoteManagerImplementation(name,
+        hostAddress,
+        listeningPort,
+        codec,
+        errorHandler,
+        orderingGuarantee,
+        numberOfTries,
+        retryTimeout,
+        this.localAddressProvider,
+        this.tcpPortProvider);
+
+  }
+
+  @Override
+  public <T> RemoteManager getInstance(String name, Codec<T> codec, 
EventHandler<Throwable> errorHandler) {
+    return new DefaultRemoteManagerImplementation(name,
+        DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to 
use the localAddressProvider
+        0, // Indicate to use the tcpPortProvider,
+        codec,
+        errorHandler,
+        this.orderingGuarantee,
+        this.numberOfTries,
+        this.retryTimeout,
+        this.localAddressProvider,
+        this.tcpPortProvider);
+  }
+
+  @Override
+  public <T> RemoteManager getInstance(final String name,
+                                       final int listeningPort,
+                                       final Codec<T> codec,
+                                       final EventHandler<Throwable> 
errorHandler) {
+    return new DefaultRemoteManagerImplementation(name,
+        DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to 
use the localAddressProvider
+        listeningPort,
+        codec,
+        errorHandler,
+        this.orderingGuarantee,
+        this.numberOfTries,
+        this.retryTimeout,
+        this.localAddressProvider,
+        this.tcpPortProvider);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index f5d7a9c..c53d063 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -61,10 +61,14 @@ public class DefaultRemoteManagerImplementation implements 
RemoteManager {
   private final HandlerContainer handlerContainer;
   private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
   private RemoteIdentifier myIdentifier;
+  /**
+   * Indicates a hostname that isn't set or known.
+   */
+  public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
 
 
   /**
-   * @deprecated have an instance injected instead.
+   * @deprecated have an instance injected instead. Or use 
RemoteManagerFactory.getInstance()
    */
   @Deprecated
   public <T> DefaultRemoteManagerImplementation(
@@ -113,7 +117,7 @@ public class DefaultRemoteManagerImplementation implements 
RemoteManager {
         new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
         new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
 
-    final String host = "##UNKNOWN##".equals(hostAddress) ? 
localAddressProvider.getLocalAddress() : hostAddress;
+    final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? 
localAddressProvider.getLocalAddress() : hostAddress;
     this.transport = new NettyMessagingTransport(
             host, listeningPort, this.reRecvStage, this.reRecvStage, 
numberOfTries, retryTimeout, tcpPortProvider);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
index 48f2c25..6ddde01 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java
@@ -18,17 +18,16 @@
  */
 package org.apache.reef.wake.test.remote;
 
-import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.impl.DefaultIdentifierFactory;
 import org.apache.reef.wake.impl.LoggingEventHandler;
-import org.apache.reef.wake.remote.*;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.RemoteIdentifier;
+import org.apache.reef.wake.remote.RemoteManager;
+import org.apache.reef.wake.remote.RemoteManagerFactory;
 import org.apache.reef.wake.remote.impl.MultiCodec;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,8 +59,8 @@ public class RemoteIdentifierFactoryTest {
 
   @Test
   public void testRemoteManagerIdentifier() throws Exception {
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    final LocalAddressProvider localAddressProvider = 
injector.getInstance(LocalAddressProvider.class);
+    final RemoteManagerFactory remoteManagerFactory = 
Tang.Factory.getTang().newInjector()
+        .getInstance(RemoteManagerFactory.class);
 
     final int port = 9100;
     final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, 
Codec<?>>();
@@ -69,9 +68,8 @@ public class RemoteIdentifierFactoryTest {
     final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap);
 
 
-    try (final RemoteManager rm = new 
DefaultRemoteManagerImplementation("TestRemoteManager",
-        localAddressProvider.getLocalAddress(), port, codec, new 
LoggingEventHandler<Throwable>(), false, 1, 10000,
-        localAddressProvider, RangeTcpPortProvider.Default)) {
+    try (final RemoteManager rm =
+             remoteManagerFactory.getInstance("TestRemoteManager", port, 
codec, new LoggingEventHandler<Throwable>())) {
       final RemoteIdentifier id = rm.getMyIdentifier();
 
       final IdentifierFactory factory = new DefaultIdentifierFactory();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
index ad17964..19abc1d 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.reef.wake.test.remote;
 
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingEventHandler;
@@ -25,9 +27,7 @@ import org.apache.reef.wake.impl.LoggingUtils;
 import org.apache.reef.wake.impl.TimerStage;
 import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import 
org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation;
-import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
 import org.apache.reef.wake.remote.impl.MultiCodec;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
 import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
@@ -52,9 +52,12 @@ import java.util.logging.Level;
 public class RemoteManagerTest {
 
   private final LocalAddressProvider localAddressProvider;
+  private final RemoteManagerFactory remoteManagerFactory;
 
   public RemoteManagerTest() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.localAddressProvider = 
injector.getInstance(LocalAddressProvider.class);
+    this.remoteManagerFactory = 
injector.getInstance(RemoteManagerFactory.class);
   }
 
   @Rule
@@ -80,9 +83,9 @@ public class RemoteManagerTest {
 
     final String hostAddress = localAddressProvider.getLocalAddress();
 
-    final RemoteManager rm = new DefaultRemoteManagerImplementation(
+    final RemoteManager rm = this.remoteManagerFactory.getInstance(
         "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000,
-            localAddressProvider, RangeTcpPortProvider.Default);
+        localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -183,9 +186,9 @@ public class RemoteManagerTest {
 
     final String hostAddress = localAddressProvider.getLocalAddress();
 
-    final RemoteManager rm = new DefaultRemoteManagerImplementation(
+    final RemoteManager rm = this.remoteManagerFactory.getInstance(
         "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), true, 3, 10000,
-            localAddressProvider, RangeTcpPortProvider.Default);
+        localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -227,9 +230,9 @@ public class RemoteManagerTest {
 
     String hostAddress = localAddressProvider.getLocalAddress();
 
-    final RemoteManager rm = new DefaultRemoteManagerImplementation(
+    final RemoteManager rm = this.remoteManagerFactory.getInstance(
         "name", hostAddress, port, codec, new 
LoggingEventHandler<Throwable>(), false, 3, 10000,
-            localAddressProvider, RangeTcpPortProvider.Default);
+        localAddressProvider, RangeTcpPortProvider.Default);
 
     RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
     RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
@@ -267,10 +270,7 @@ public class RemoteManagerTest {
 
     ExceptionHandler errorHandler = new ExceptionHandler(monitor);
 
-    try (final RemoteManager rm = new DefaultRemoteManagerImplementation(
-        "name", hostAddress, port, codec, errorHandler, false, 3, 10000, 
localAddressProvider,
-            RangeTcpPortProvider.Default)) {
-
+    try (final RemoteManager rm = remoteManagerFactory.getInstance("name", 
port, codec, errorHandler)) {
       RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
       RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + port);
 
@@ -296,9 +296,9 @@ public class RemoteManagerTest {
     Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap);
 
     String hostAddress = localAddressProvider.getLocalAddress();
-    return new DefaultRemoteManagerImplementation(name, hostAddress, localPort,
+    return remoteManagerFactory.getInstance(name, hostAddress, localPort,
         codec, new LoggingEventHandler<Throwable>(), false, retry, 
retryTimeout,
-            localAddressProvider, RangeTcpPortProvider.Default);
+        localAddressProvider, RangeTcpPortProvider.Default);
   }
 
   private class SendingRemoteManagerThread implements Callable<Integer> {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
index 3280bd6..1167746 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java
@@ -24,20 +24,20 @@ import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.impl.LoggingEventHandler;
 import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
 import 
org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation;
-import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation;
-import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
 
 import javax.inject.Inject;
 import java.net.UnknownHostException;
 
 public class TestRemote implements Runnable {
+  private final RemoteManagerFactory remoteManagerFactory;
   private final LocalAddressProvider localAddressProvider;
 
   @Inject
-  public TestRemote() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  public TestRemote(final LocalAddressProvider localAddressProvider,
+                    final RemoteManagerFactory remoteManagerFactory) {
+    this.localAddressProvider = localAddressProvider;
+    this.remoteManagerFactory = remoteManagerFactory;
   }
 
   @Override
@@ -46,9 +46,7 @@ public class TestRemote implements Runnable {
     int myPort = 10011;
     int remotePort = 10001;
     Codec<TestEvent> codec = new TestEventCodec();
-    try (RemoteManager rm = new DefaultRemoteManagerImplementation("name", 
hostAddress,
-        myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, 
localAddressProvider,
-            RangeTcpPortProvider.Default)) {
+    try (RemoteManager rm = remoteManagerFactory.getInstance("name", myPort, 
codec, new LoggingEventHandler<Throwable>())) {
       // proxy handler
       RemoteIdentifierFactory factory = new 
DefaultRemoteIdentifierFactoryImplementation();
       RemoteIdentifier remoteId = factory.getNewInstance("socket://" + 
hostAddress + ":" + remotePort);

Reply via email to