Repository: reef Updated Branches: refs/heads/master 0314e61a5 -> 49157eeda
[REEF-1905] Create RemoteManager with given host and port; clean up the RM factory API Summary of changes: * Add a new `.getInstance()` method to create RM by a given host name and port number * remove duplicate code from `DefaultRemoteManagerImplementation` and call one uber-method for all `.getInstance()` versions JIRA: [REEF-1905](https://issues.apache.org/jira/browse/REEF-1905) Pull request: This closes #1394 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/49157eed Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/49157eed Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/49157eed Branch: refs/heads/master Commit: 49157eeda190f073b6629f04323e1da75261ea99 Parents: 0314e61 Author: Sergiy Matusevych <[email protected]> Authored: Wed Oct 18 16:40:33 2017 -0700 Committer: Doug Service <[email protected]> Committed: Tue Oct 24 02:32:40 2017 +0000 ---------------------------------------------------------------------- .../remote/DefaultRemoteManagerFactory.java | 170 +++++++------------ .../reef/wake/remote/RemoteManagerFactory.java | 13 ++ 2 files changed, 78 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/49157eed/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java index 1e097b6..0a0393c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java @@ -34,7 +34,7 @@ import javax.inject.Inject; */ final class DefaultRemoteManagerFactory implements RemoteManagerFactory { - private final Injector injector; + private final Injector injector = Tang.Factory.getTang().newInjector(); private final Codec<?> codec; private final EventHandler<Throwable> errorHandler; @@ -55,7 +55,7 @@ final class DefaultRemoteManagerFactory implements RemoteManagerFactory { final LocalAddressProvider localAddressProvider, final TransportFactory tpFactory, final TcpPortProvider tcpPortProvider) { - this.injector = Tang.Factory.getTang().newInjector(); + this.codec = codec; this.errorHandler = errorHandler; this.orderingGuarantee = orderingGuarantee; @@ -67,125 +67,85 @@ final class DefaultRemoteManagerFactory implements RemoteManagerFactory { } @Override - public RemoteManager getInstance(final String name) { - try { - final Injector newInjector = injector.forkInjector(); - newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); - newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, this.codec); - newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, this.errorHandler); - newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); - newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); - newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); - newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); - newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); - return newInjector.getInstance(RemoteManager.class); - } catch (InjectionException e) { - throw new RuntimeException(e); - } + public RemoteManager getInstance(final String newRmName) { + return getInstance(newRmName, 0, this.codec, this.errorHandler); } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final String hostAddress, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler, - final boolean orderingGuarantee, - final int numberOfTries, - final int retryTimeout, - final LocalAddressProvider localAddressProvider, - final TcpPortProvider tcpPortProvider) { - try { - final Injector newInjector = injector.forkInjector(); - newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); - newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); - newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); - newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); - newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); - newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee); - newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); - newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); - newInjector.bindVolatileInstance(LocalAddressProvider.class, localAddressProvider); - newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); - newInjector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); - return newInjector.getInstance(RemoteManager.class); - } catch (InjectionException e) { - throw new RuntimeException(e); - } + public <T> RemoteManager getInstance(final String newRmName, + final String newHostAddress, + final int newListeningPort, + final Codec<T> newCodec) { + return getInstance(newRmName, newHostAddress, newListeningPort, newCodec, + this.errorHandler, this.orderingGuarantee, this.numberOfTries, this.retryTimeout, + this.localAddressProvider, this.tcpPortProvider); } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final String hostAddress, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler, - final boolean orderingGuarantee, - final int numberOfTries, - final int retryTimeout) { - try { - final Injector newInjector = injector.forkInjector(); - newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); - newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); - newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); - newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); - newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); - newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee); - newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); - newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); - newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); - newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); - return newInjector.getInstance(RemoteManager.class); - } catch (InjectionException e) { - throw new RuntimeException(e); - } + public <T> RemoteManager getInstance(final String newRmName, + final String newHostAddress, + final int newListeningPort, + final Codec<T> newCodec, + final EventHandler<Throwable> newErrorHandler, + final boolean newOrderingGuarantee, + final int newNumberOfTries, + final int newRetryTimeout) { + return getInstance(newRmName, newHostAddress, newListeningPort, newCodec, newErrorHandler, + newOrderingGuarantee, newNumberOfTries, newRetryTimeout, this.localAddressProvider, this.tcpPortProvider); } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance( - final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) { - try { - final Injector newInjector = injector.forkInjector(); - newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); - newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); - newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); - newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); - newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); - newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); - newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); - newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); - return newInjector.getInstance(RemoteManager.class); - } catch (InjectionException e) { - throw new RuntimeException(e); - } + public <T> RemoteManager getInstance(final String newRmName, + final Codec<T> newCodec, + final EventHandler<Throwable> newErrorHandler) { + return getInstance(newRmName, 0, newCodec, newErrorHandler); } @Override - @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(final String name, - final int listeningPort, - final Codec<T> codec, - final EventHandler<Throwable> errorHandler) { + public <T> RemoteManager getInstance(final String newRmName, + final int newListeningPort, + final Codec<T> newCodec, + final EventHandler<Throwable> newErrorHandler) { + return getInstance(newRmName, null, newListeningPort, newCodec, newErrorHandler, this.orderingGuarantee, + this.numberOfTries, this.retryTimeout, this.localAddressProvider, this.tcpPortProvider); + } + + @Override + public <T> RemoteManager getInstance(final String newRmName, + final String newHostAddress, + final int newListeningPort, + final Codec<T> newCodec, + final EventHandler<Throwable> newErrorHandler, + final boolean newOrderingGuarantee, + final int newNumberOfTries, + final int newRetryTimeout, + final LocalAddressProvider newLocalAddressProvider, + final TcpPortProvider newTcpPortProvider) { try { + final Injector newInjector = injector.forkInjector(); - newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name); - newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort); - newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec); - newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler); - newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee); - newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries); - newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout); - newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + if (newHostAddress != null) { + newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, newHostAddress); + } + + if (newListeningPort > 0) { + newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, newListeningPort); + } + + newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, newRmName); + newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, newCodec); + newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, newErrorHandler); + newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, newOrderingGuarantee); + newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, newNumberOfTries); + newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, newRetryTimeout); + newInjector.bindVolatileInstance(LocalAddressProvider.class, newLocalAddressProvider); newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory); - newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider); + newInjector.bindVolatileInstance(TcpPortProvider.class, newTcpPortProvider); + return newInjector.getInstance(RemoteManager.class); - } catch (InjectionException e) { + + } catch (final InjectionException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/reef/blob/49157eed/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java index 04c928d..60bd2b8 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java @@ -62,6 +62,19 @@ public interface RemoteManagerFactory { final EventHandler<Throwable> errorHandler); /** + * @param name the name of the returned RemoteManager to instantiate. + * @param hostAddress the address the returned RemoteManager binds to. + * @param listeningPort the port on which the returned RemoteManager listens. + * @param codec the codec to use to decode the messages sent to / by this RemoteManager. + * @param <T> the message type sent / received by the returned RemoteManager. + * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. + */ + <T> RemoteManager getInstance(final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec); + + /** * The old constructor of DefaultRemoteManagerImplementation. Avoid if you can. * * @param name the name of the returned RemoteManager to instantiate.
