Repository: reef Updated Branches: refs/heads/master e5a11f3bd -> ccecdd577
[REEF-991] Remove RangeTcpPortProvider.Default and DefaultRemoteManagerImplementation constructor This PR resolves the followings: * `RangeTcpPortProvider.Default` and its usage * `DefaultRemoteManagerImplementation` constructor and its usage * Make DefaultRemoteManagerImplementation class as `final` JIRA: [REEF-991](https://issues.apache.org/jira/browse/REEF-991) Pull request: This closes #666 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ccecdd57 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ccecdd57 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ccecdd57 Branch: refs/heads/master Commit: ccecdd5778829d118c3d29f9b75c62e68f1260bf Parents: e5a11f3 Author: Dongjoon Hyun <[email protected]> Authored: Fri Nov 20 20:01:54 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Mon Dec 14 07:09:57 2015 -0800 ---------------------------------------------------------------------- .../remote/DefaultRemoteManagerFactory.java | 192 +++++++++++++++++++ .../reef/wake/remote/RemoteManagerFactory.java | 1 - .../impl/DefaultRemoteManagerFactory.java | 163 ---------------- .../DefaultRemoteManagerImplementation.java | 25 +-- .../wake/remote/ports/RangeTcpPortProvider.java | 10 - .../netty/MessagingTransportFactory.java | 38 +++- .../wake/test/remote/RemoteManagerTest.java | 19 +- 7 files changed, 238 insertions(+), 210 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java new file mode 100644 index 0000000..442d4fa --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote; + +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; + +/** + * Default implementation of RemoteManagerFactory. + */ +final class DefaultRemoteManagerFactory implements RemoteManagerFactory { + + private final Injector injector; + + private final Codec<?> codec; + private final EventHandler<Throwable> errorHandler; + private final boolean orderingGuarantee; + private final int numberOfTries; + private final int retryTimeout; + private final LocalAddressProvider localAddressProvider; + private final TransportFactory transportFactory; + private final TcpPortProvider tcpPortProvider; + + @Inject + private DefaultRemoteManagerFactory( + @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<?> codec, + @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, + @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, + @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, + @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory, + final TcpPortProvider tcpPortProvider, + final Injector injector) { + this.injector = injector.forkInjector(); + this.codec = codec; + this.errorHandler = errorHandler; + this.orderingGuarantee = orderingGuarantee; + this.numberOfTries = numberOfTries; + this.retryTimeout = retryTimeout; + this.localAddressProvider = localAddressProvider; + this.transportFactory = tpFactory; + this.tcpPortProvider = tcpPortProvider; + } + + @Override + public RemoteManager getInstance(final String name) { + try { + final Injector newInjector = injector.forkInjector(); + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, this.codec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, this.errorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); + newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); + return newInjector.getInstance(RemoteManager.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("checkstyle:hiddenfield") + public <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider) { + try { + final Injector newInjector = injector.forkInjector(); + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); + newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); + newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, localAddressProvider); + newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); + newInjector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); + return newInjector.getInstance(RemoteManager.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("checkstyle:hiddenfield") + public <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout) { + try { + final Injector newInjector = injector.forkInjector(); + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); + newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); + newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); + newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); + return newInjector.getInstance(RemoteManager.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("checkstyle:hiddenfield") + public <T> RemoteManager getInstance( + final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) { + try { + final Injector newInjector = injector.forkInjector(); + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); + newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); + return newInjector.getInstance(RemoteManager.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + @Override + @SuppressWarnings("checkstyle:hiddenfield") + public <T> RemoteManager getInstance(final String name, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler) { + try { + final Injector newInjector = injector.forkInjector(); + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); + newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); + newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); + return newInjector.getInstance(RemoteManager.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java index b27f7bc..04c928d 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java @@ -21,7 +21,6 @@ package org.apache.reef.wake.remote; import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.impl.DefaultRemoteManagerFactory; import org.apache.reef.wake.remote.ports.TcpPortProvider; /** http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java deleted file mode 100644 index bb8ff2d..0000000 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.wake.remote.impl; - -import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.RemoteConfiguration; -import org.apache.reef.wake.remote.RemoteManager; -import org.apache.reef.wake.remote.RemoteManagerFactory; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.ports.TcpPortProvider; -import org.apache.reef.wake.remote.transport.TransportFactory; - -import javax.inject.Inject; - -/** - * Default implementation of RemoteManagerFactory. - */ -public final class DefaultRemoteManagerFactory implements RemoteManagerFactory { - - private final Codec<?> codec; - private final EventHandler<Throwable> errorHandler; - private final boolean orderingGuarantee; - private final int numberOfTries; - private final int retryTimeout; - private final LocalAddressProvider localAddressProvider; - private final TransportFactory tpFactory; - - @Inject - private DefaultRemoteManagerFactory( - @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<?> codec, - @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, - @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, - @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, - @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, - final LocalAddressProvider localAddressProvider, - final TransportFactory tpFactory) { - this.codec = codec; - this.errorHandler = errorHandler; - this.orderingGuarantee = orderingGuarantee; - this.numberOfTries = numberOfTries; - this.retryTimeout = retryTimeout; - this.localAddressProvider = localAddressProvider; - this.tpFactory = tpFactory; - } - - // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor. - @Override - public RemoteManager getInstance(final String name) { - return new DefaultRemoteManagerImplementation(name, - DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider - 0, // Indicate to use the tcpPortProvider - this.codec, - this.errorHandler, - this.orderingGuarantee, - this.numberOfTries, - this.retryTimeout, - this.localAddressProvider, - this.tpFactory); - } - - // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor. - @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final String hostAddress, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler, - final boolean orderingGuarantee, - final int numberOfTries, - final int retryTimeout, - final LocalAddressProvider localAddressProvider, - final TcpPortProvider tcpPortProvider) { - return new DefaultRemoteManagerImplementation(name, - hostAddress, - listeningPort, - codec, - errorHandler, - orderingGuarantee, - numberOfTries, - retryTimeout, - localAddressProvider, - tpFactory); - } - - // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor. - @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final String hostAddress, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler, - final boolean orderingGuarantee, - final int numberOfTries, - final int retryTimeout) { - return new DefaultRemoteManagerImplementation(name, - hostAddress, - listeningPort, - codec, - errorHandler, - orderingGuarantee, - numberOfTries, - retryTimeout, - this.localAddressProvider, - this.tpFactory); - - } - - // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor. - @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance( - final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) { - return new DefaultRemoteManagerImplementation(name, - DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider - 0, // Indicate to use the tcpPortProvider, - codec, - errorHandler, - this.orderingGuarantee, - this.numberOfTries, - this.retryTimeout, - this.localAddressProvider, - this.tpFactory); - } - - // TODO[REEF-547]: This method uses deprecated DefaultRemoteManagerImplementation constructor. - @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler) { - return new DefaultRemoteManagerImplementation(name, - DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider - listeningPort, - codec, - errorHandler, - this.orderingGuarantee, - this.numberOfTries, - this.retryTimeout, - this.localAddressProvider, - this.tpFactory); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index 3aee204..3e972fe 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -24,7 +24,6 @@ import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.StageManager; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.TransportFactory; @@ -43,7 +42,7 @@ import java.util.logging.Logger; /** * Default remote manager implementation. */ -public class DefaultRemoteManagerImplementation implements RemoteManager { +public final class DefaultRemoteManagerImplementation implements RemoteManager { private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName()); @@ -66,28 +65,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { */ public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME; - /** - * @deprecated have an instance injected instead. - */ - @Deprecated - @Inject - public <T> DefaultRemoteManagerImplementation( - @Parameter(RemoteConfiguration.ManagerName.class) final String name, - @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, - @Parameter(RemoteConfiguration.Port.class) final int listeningPort, - @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec, - @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, - @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, - @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, - @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, - final LocalAddressProvider localAddressProvider, - final TransportFactory tpFactory) { - this(name, hostAddress, listeningPort, codec, errorHandler, orderingGuarantee, numberOfTries, retryTimeout, - localAddressProvider, tpFactory, RangeTcpPortProvider.Default); - } - @Inject - private <T> DefaultRemoteManagerImplementation( + private <T> DefaultRemoteManagerImplementation( @Parameter(RemoteConfiguration.ManagerName.class) final String name, @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, @Parameter(RemoteConfiguration.Port.class) final int listeningPort, http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java index a5669be..fe959a2 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java @@ -58,16 +58,6 @@ public final class RangeTcpPortProvider implements TcpPortProvider { return new RandomRangeIterator(portRangeBegin, portRangeCount, portRangeTryCount); } - /** - * @deprecated in 0.12 have an instance injected instead. - */ - @Deprecated - @SuppressWarnings("checkstyle:constantname") - public static final RangeTcpPortProvider Default = new RangeTcpPortProvider( - Integer.parseInt(TcpPortRangeBegin.DEFAULT_VALUE), - Integer.parseInt(TcpPortRangeCount.DEFAULT_VALUE), - Integer.parseInt(TcpPortRangeTryCount.DEFAULT_VALUE)); - @Override public String toString() { return "RangeTcpPortProvider{" + http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java index efc67de..0409553 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java @@ -27,7 +27,6 @@ import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.remote.RemoteConfiguration; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.TransportFactory; @@ -81,19 +80,46 @@ public class MessagingTransportFactory implements TransportFactory { } } - // TODO[REEF-547]: This method uses deprecated RangeTcpPortProvider.Default. Must remove usages and deprecate. + /** + * Creates a transport. + * + * @param hostAddress a host address + * @param port a listening port + * @param clientStage a client stage + * @param serverStage a server stage + * @param numberOfTries a number of tries + * @param retryTimeout a timeout for retry + */ @Override - public Transport newInstance(final String hostAddress, final int port, + public Transport newInstance(final String hostAddress, + final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, final int retryTimeout) { - return newInstance(hostAddress, port, clientStage, - serverStage, numberOfTries, retryTimeout, RangeTcpPortProvider.Default); + try { + TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); + return newInstance(hostAddress, port, clientStage, + serverStage, numberOfTries, retryTimeout, tcpPortProvider); + } catch (final InjectionException e) { + throw new RuntimeException(e); + } } + /** + * Creates a transport. + * + * @param hostAddress a host address + * @param port a listening port + * @param clientStage a client stage + * @param serverStage a server stage + * @param numberOfTries a number of tries + * @param retryTimeout a timeout for retry + * @param tcpPortProvider a provider for TCP port + */ @Override - public Transport newInstance(final String hostAddress, final int port, + public Transport newInstance(final String hostAddress, + final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, http://git-wip-us.apache.org/repos/asf/reef/blob/ccecdd57/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index a46a54e..196d7d6 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -30,7 +30,7 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -88,7 +88,7 @@ public class RemoteManagerTest { final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); @@ -192,7 +192,7 @@ public class RemoteManagerTest { final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); @@ -236,7 +236,7 @@ public class RemoteManagerTest { final RemoteManager rm = this.remoteManagerFactory.getInstance( "name", hostAddress, PORT, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, - localAddressProvider, RangeTcpPortProvider.Default); + localAddressProvider, Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class)); final RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); final RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + PORT); @@ -301,9 +301,14 @@ public class RemoteManagerTest { final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); final String hostAddress = localAddressProvider.getLocalAddress(); - return remoteManagerFactory.getInstance(rmName, hostAddress, localPort, - codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, - localAddressProvider, RangeTcpPortProvider.Default); + try { + TcpPortProvider tcpPortProvider = Tang.Factory.getTang().newInjector().getInstance(TcpPortProvider.class); + return remoteManagerFactory.getInstance(rmName, hostAddress, localPort, + codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, + localAddressProvider, tcpPortProvider); + } catch (final InjectionException e) { + throw new RuntimeException(e); + } } private class SendingRemoteManagerThread implements Callable<Integer> {
