Repository: incubator-reef Updated Branches: refs/heads/master 19fdfcc65 -> 637f9fcaa
[REEF-283] Introduce RemoteManagerFactory This change adds the new interface `RemoteManagerFactory` together with its default implementation `DefaultRemoteManagerFactory`. The factory is injectable, yet offers all the constructors of `DefaultRemoteManager` through its methods. This allows us to mix and match between Tang Configuration and ad-hoc overrides. JIRA: [REEF-283](https://issues.apache.org/jira/browse/REEF-283) Pull Request: Closes #164 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/637f9fca Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/637f9fca Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/637f9fca Branch: refs/heads/master Commit: 637f9fcaa33ed18ddeaa1e9830c0f2f5a3df8cb3 Parents: 19fdfcc Author: Markus Weimer <[email protected]> Authored: Mon Apr 27 13:19:52 2015 -0700 Committer: Brian Cho <[email protected]> Committed: Thu Apr 30 18:34:30 2015 +0900 ---------------------------------------------------------------------- .../runtime/mesos/util/MesosRemoteManager.java | 11 +- .../reef/wake/remote/RemoteManagerFactory.java | 116 ++++++++++++++ .../impl/DefaultRemoteManagerFactory.java | 153 +++++++++++++++++++ .../DefaultRemoteManagerImplementation.java | 8 +- .../remote/RemoteIdentifierFactoryTest.java | 18 +-- .../wake/test/remote/RemoteManagerTest.java | 30 ++-- .../reef/wake/test/remote/TestRemote.java | 14 +- 7 files changed, 309 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java index 9a4f974..20effe0 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java @@ -19,12 +19,10 @@ package org.apache.reef.runtime.mesos.util; import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.RemoteIdentifierFactory; import org.apache.reef.wake.remote.RemoteManager; +import org.apache.reef.wake.remote.RemoteManagerFactory; import org.apache.reef.wake.remote.RemoteMessage; -import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import javax.inject.Inject; @@ -41,10 +39,11 @@ public final class MesosRemoteManager { MesosRemoteManager(final RemoteIdentifierFactory factory, final MesosErrorHandler mesosErrorHandler, final MesosRemoteManagerCodec codec, - final LocalAddressProvider localAddressProvider) { + final RemoteManagerFactory remoteManagerFactory) { this.factory = factory; - this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", "##UNKNOWN##", 0, - codec, mesosErrorHandler, false, 3, 10000, localAddressProvider, RangeTcpPortProvider.Default); + this.raw = remoteManagerFactory.getInstance("MESOS_EXECUTOR", codec, mesosErrorHandler); + + } public <T> EventHandler<T> getHandler( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java new file mode 100644 index 0000000..4c741de --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote; + +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.impl.DefaultRemoteManagerFactory; +import org.apache.reef.wake.remote.ports.TcpPortProvider; + +/** + * Injectable Factory for RemoteManager instances. + * <p/> + * Use when direct injection of the RemoteManager is impossible. + */ +@DefaultImplementation(DefaultRemoteManagerFactory.class) +public interface RemoteManagerFactory { + + /** + * @param name the name of used by the returned RemoteManager to determine the address to bind to. to instantiate. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + RemoteManager getInstance(final String name); + + /** + * @param name the name of the returned RemoteManager to instantiate. + * @param codec the codec to use to decode the messages sent to / by this RemoteManager. + * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. + * @param <T> the message type sent / received by the returned RemoteManager. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + <T> RemoteManager getInstance(final String name, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler); + + /** + * @param name the name of the returned RemoteManager to instantiate. + * @param listeningPort the port on which the returned RemoteManager listens. + * @param codec the codec to use to decode the messages sent to / by this RemoteManager. + * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. + * @param <T> the message type sent / received by the returned RemoteManager. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + <T> RemoteManager getInstance(final String name, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler); + + /** + * The old constructor of DefaultRemoteManagerImplementation. Avoid if you can. + * + * @param name the name of the returned RemoteManager to instantiate. + * @param hostAddress the address the returned RemoteManager binds to. + * @param listeningPort the port on which the returned RemoteManager listens. + * @param codec the codec to use to decode the messages sent to / by this RemoteManager. + * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. + * @param orderingGuarantee whether or not the returned RemoteManager should guarantee message orders. + * @param numberOfTries the number of retries before the returned RemoteManager declares sending a failure. + * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt failed. + * @param <T> the message type sent / received by the returned RemoteManager. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout); + + /** + * The all-out constructor of DefaultRemoteManagerImplementation. Avoid if you can. + * + * @param name the name of the returned RemoteManager to instantiate. + * @param hostAddress the address the returned RemoteManager binds to. + * @param listeningPort the port on which the returned RemoteManager listens. + * @param codec the codec to use to decode the messages sent to / by this RemoteManager. + * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. + * @param orderingGuarantee whether or not the returned RemoteManager should guarantee message orders. + * @param numberOfTries the number of retries before the returned RemoteManager declares sending a failure. + * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt failed. + * @param localAddressProvider the LocalAddressProvider used by the returned RemoteManager to determine the address to bind to. + * @param tcpPortProvider the TcpPortProvider used by the returned RemoteManager to determine the port to listen to. + * @param <T> the message type sent / received by the returned RemoteManager. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider); + + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java new file mode 100644 index 0000000..e36b4dc --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.impl; + +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.remote.RemoteManager; +import org.apache.reef.wake.remote.RemoteManagerFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; + +import javax.inject.Inject; + +/** + * Default implementation of RemoteManagerFactory. + */ +public class DefaultRemoteManagerFactory implements RemoteManagerFactory { + + private final Codec<?> codec; + private final EventHandler<Throwable> errorHandler; + private final boolean orderingGuarantee; + private final int numberOfTries; + private final int retryTimeout; + private final LocalAddressProvider localAddressProvider; + private final TcpPortProvider tcpPortProvider; + + @Inject + private DefaultRemoteManagerFactory( + final @Parameter(RemoteConfiguration.MessageCodec.class) Codec<?> codec, + final @Parameter(RemoteConfiguration.ErrorHandler.class) EventHandler<Throwable> errorHandler, + final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean orderingGuarantee, + final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, + final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider) { + this.codec = codec; + this.errorHandler = errorHandler; + this.orderingGuarantee = orderingGuarantee; + this.numberOfTries = numberOfTries; + this.retryTimeout = retryTimeout; + this.localAddressProvider = localAddressProvider; + this.tcpPortProvider = tcpPortProvider; + } + + @Override + public RemoteManager getInstance(final String name) { + return new DefaultRemoteManagerImplementation(name, + DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider + 0, // Indicate to use the tcpPortProvider + this.codec, + this.errorHandler, + this.orderingGuarantee, + this.numberOfTries, + this.retryTimeout, + this.localAddressProvider, + this.tcpPortProvider); + } + + + @Override + public <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider) { + return new DefaultRemoteManagerImplementation(name, + hostAddress, + listeningPort, + codec, + errorHandler, + orderingGuarantee, + numberOfTries, + retryTimeout, + localAddressProvider, + tcpPortProvider); + } + + @Override + public <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout) { + return new DefaultRemoteManagerImplementation(name, + hostAddress, + listeningPort, + codec, + errorHandler, + orderingGuarantee, + numberOfTries, + retryTimeout, + this.localAddressProvider, + this.tcpPortProvider); + + } + + @Override + public <T> RemoteManager getInstance(String name, Codec<T> codec, EventHandler<Throwable> errorHandler) { + return new DefaultRemoteManagerImplementation(name, + DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider + 0, // Indicate to use the tcpPortProvider, + codec, + errorHandler, + this.orderingGuarantee, + this.numberOfTries, + this.retryTimeout, + this.localAddressProvider, + this.tcpPortProvider); + } + + @Override + public <T> RemoteManager getInstance(final String name, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler) { + return new DefaultRemoteManagerImplementation(name, + DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider + listeningPort, + codec, + errorHandler, + this.orderingGuarantee, + this.numberOfTries, + this.retryTimeout, + this.localAddressProvider, + this.tcpPortProvider); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index f5d7a9c..c53d063 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -61,10 +61,14 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { private final HandlerContainer handlerContainer; private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); private RemoteIdentifier myIdentifier; + /** + * Indicates a hostname that isn't set or known. + */ + public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##"; /** - * @deprecated have an instance injected instead. + * @deprecated have an instance injected instead. Or use RemoteManagerFactory.getInstance() */ @Deprecated public <T> DefaultRemoteManagerImplementation( @@ -113,7 +117,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); - final String host = "##UNKNOWN##".equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; + final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; this.transport = new NettyMessagingTransport( host, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java index 48f2c25..6ddde01 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java @@ -18,17 +18,16 @@ */ package org.apache.reef.wake.test.remote; -import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.impl.DefaultIdentifierFactory; import org.apache.reef.wake.impl.LoggingEventHandler; -import org.apache.reef.wake.remote.*; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.RemoteIdentifier; +import org.apache.reef.wake.remote.RemoteManager; +import org.apache.reef.wake.remote.RemoteManagerFactory; import org.apache.reef.wake.remote.impl.MultiCodec; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -60,8 +59,8 @@ public class RemoteIdentifierFactoryTest { @Test public void testRemoteManagerIdentifier() throws Exception { - final Injector injector = Tang.Factory.getTang().newInjector(); - final LocalAddressProvider localAddressProvider = injector.getInstance(LocalAddressProvider.class); + final RemoteManagerFactory remoteManagerFactory = Tang.Factory.getTang().newInjector() + .getInstance(RemoteManagerFactory.class); final int port = 9100; final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); @@ -69,9 +68,8 @@ public class RemoteIdentifierFactoryTest { final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - try (final RemoteManager rm = new DefaultRemoteManagerImplementation("TestRemoteManager", - localAddressProvider.getLocalAddress(), port, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, - localAddressProvider, RangeTcpPortProvider.Default)) { + try (final RemoteManager rm = + remoteManagerFactory.getInstance("TestRemoteManager", port, codec, new LoggingEventHandler<Throwable>())) { final RemoteIdentifier id = rm.getMyIdentifier(); final IdentifierFactory factory = new DefaultIdentifierFactory(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index ad17964..19abc1d 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -18,6 +18,8 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; @@ -25,9 +27,7 @@ import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; -import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; @@ -52,9 +52,12 @@ import java.util.logging.Level; public class RemoteManagerTest { private final LocalAddressProvider localAddressProvider; + private final RemoteManagerFactory remoteManagerFactory; public RemoteManagerTest() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.remoteManagerFactory = injector.getInstance(RemoteManagerFactory.class); } @Rule @@ -80,9 +83,9 @@ public class RemoteManagerTest { final String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = new DefaultRemoteManagerImplementation( + final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -183,9 +186,9 @@ public class RemoteManagerTest { final String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = new DefaultRemoteManagerImplementation( + final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -227,9 +230,9 @@ public class RemoteManagerTest { String hostAddress = localAddressProvider.getLocalAddress(); - final RemoteManager rm = new DefaultRemoteManagerImplementation( + final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -267,10 +270,7 @@ public class RemoteManagerTest { ExceptionHandler errorHandler = new ExceptionHandler(monitor); - try (final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, errorHandler, false, 3, 10000, localAddressProvider, - RangeTcpPortProvider.Default)) { - + try (final RemoteManager rm = remoteManagerFactory.getInstance("name", port, codec, errorHandler)) { RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -296,9 +296,9 @@ public class RemoteManagerTest { Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); String hostAddress = localAddressProvider.getLocalAddress(); - return new DefaultRemoteManagerImplementation(name, hostAddress, localPort, + return remoteManagerFactory.getInstance(name, hostAddress, localPort, codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, RangeTcpPortProvider.Default); } private class SendingRemoteManagerThread implements Callable<Integer> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/637f9fca/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java index 3280bd6..1167746 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java @@ -24,20 +24,20 @@ import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; -import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import javax.inject.Inject; import java.net.UnknownHostException; public class TestRemote implements Runnable { + private final RemoteManagerFactory remoteManagerFactory; private final LocalAddressProvider localAddressProvider; @Inject - public TestRemote() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + public TestRemote(final LocalAddressProvider localAddressProvider, + final RemoteManagerFactory remoteManagerFactory) { + this.localAddressProvider = localAddressProvider; + this.remoteManagerFactory = remoteManagerFactory; } @Override @@ -46,9 +46,7 @@ public class TestRemote implements Runnable { int myPort = 10011; int remotePort = 10001; Codec<TestEvent> codec = new TestEventCodec(); - try (RemoteManager rm = new DefaultRemoteManagerImplementation("name", hostAddress, - myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, localAddressProvider, - RangeTcpPortProvider.Default)) { + try (RemoteManager rm = remoteManagerFactory.getInstance("name", myPort, codec, new LoggingEventHandler<Throwable>())) { // proxy handler RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort);
