Repository: reef Updated Branches: refs/heads/master 7d62431bf -> 1a329b4c3
[REEF-548] Remove APIs deprecated since 0.11 from REEF-IO This change * makes GroupCommDriverImpl, NameServerImpl and NetworkService constructors private. * replaces direct calls to NetworkService constructor with injections. JIRA: [REEF-548](https://issues.apache.org/jira/browse/REEF-548) Pull request: This closes #958 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1a329b4c Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1a329b4c Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1a329b4c Branch: refs/heads/master Commit: 1a329b4c375a88b88e8618de339bdff730b8061e Parents: 7d62431 Author: Mariia Mykhailova <[email protected]> Authored: Fri Apr 8 16:42:17 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Apr 20 10:27:17 2016 -0700 ---------------------------------------------------------------------- .../group/impl/driver/GroupCommDriverImpl.java | 41 +++--- .../reef/io/network/impl/NetworkService.java | 12 +- .../reef/io/network/naming/NameClient.java | 1 - .../reef/io/network/naming/NameServerImpl.java | 4 +- .../reef/io/network/NetworkServiceTest.java | 128 +++++++++++++------ 5 files changed, 119 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1a329b4c/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java index 6c8c72f..b09b9e1 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java @@ -73,7 +73,7 @@ import java.util.logging.Logger; * <p> * Also starts the NameService and the NetworkService on the driver */ -public class GroupCommDriverImpl implements GroupCommServiceDriver { +public final class GroupCommDriverImpl implements GroupCommServiceDriver { private static final Logger LOG = Logger.getLogger(GroupCommDriverImpl.class.getName()); /** * TANG instance. @@ -107,12 +107,8 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { private final EStage<GroupCommunicationMessage> groupCommMessageStage; private final int fanOut; - /** - * @deprecated in 0.12. Use Tang to obtain an instance of this instead. - */ - @Deprecated @Inject - public GroupCommDriverImpl(final ConfigurationSerializer confSerializer, + private GroupCommDriverImpl(final ConfigurationSerializer confSerializer, @Parameter(DriverIdentifier.class) final String driverId, @Parameter(TreeTopologyFanOut.class) final int fanOut, final LocalAddressProvider localAddressProvider, @@ -140,22 +136,33 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { .build()) .build(); - NameResolver nameResolver = null; + final NameResolver nameResolver; try { nameResolver = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameResolver.class); } catch (final InjectionException e) { - throw new RuntimeException(e); + throw new RuntimeException("Failed to instantiate NameResolver", e); } - this.netService = new NetworkService<>(idFac, 0, nameResolver, - new GroupCommunicationMessageCodec(), tpFactory, - new EventHandler<Message<GroupCommunicationMessage>>() { - - @Override - public void onNext(final Message<GroupCommunicationMessage> msg) { - groupCommMessageStage.onNext(Utils.getGCM(msg)); - } - }, new LoggingEventHandler<Exception>(), localAddressProvider); + try { + final Injector injector = TANG.newInjector(); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, idFac); + injector.bindVolatileInstance(NameResolver.class, nameResolver); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, + new GroupCommunicationMessageCodec()); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, tpFactory); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new EventHandler<Message<GroupCommunicationMessage>>() { + @Override + public void onNext(final Message<GroupCommunicationMessage> msg) { + groupCommMessageStage.onNext(Utils.getGCM(msg)); + } + }); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new LoggingEventHandler<Exception>()); + this.netService = injector.getInstance(NetworkService.class); + } catch (final InjectionException e) { + throw new RuntimeException("Failed to instantiate NetworkService", e); + } this.netService.registerId(idFac.getNewInstance(driverId)); final EStage<GroupCommunicationMessage> senderStage = new ThreadPoolStage<>("SrcCtrlMsgSender", new CtrlMsgSender(idFac, netService), 5); http://git-wip-us.apache.org/repos/asf/reef/blob/1a329b4c/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java index 679f1de..5f77d69 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java @@ -29,7 +29,6 @@ import org.apache.reef.wake.*; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.impl.SingleThreadStage; import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.TransportFactory; @@ -58,21 +57,16 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { private final EStage<Identifier> nameServiceUnregisteringStage; private Identifier myId; - /** - * @deprecated in 0.12. Use Tang to obtain an instance of this instead. - */ - @Deprecated @Inject - public NetworkService( + private NetworkService( @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) final IdentifierFactory factory, @Parameter(NetworkServiceParameters.NetworkServicePort.class) final int nsPort, final NameResolver nameResolver, @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) final Codec<T> codec, @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) final TransportFactory tpFactory, @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) final EventHandler<Message<T>> recvHandler, - @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) final EventHandler<Exception> exHandler, - final LocalAddressProvider localAddressProvider) { - + @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) + final EventHandler<Exception> exHandler) { this.factory = factory; this.codec = codec; this.transport = tpFactory.newInstance(nsPort, http://git-wip-us.apache.org/repos/asf/reef/blob/1a329b4c/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java index 7e1a4eb..47ed69f 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java @@ -65,7 +65,6 @@ public final class NameClient implements NameResolver { * @param tpFactory transport factory */ @Inject - @SuppressWarnings("deprecation") private NameClient( @Parameter(NameResolverNameServerAddr.class) final String serverAddr, @Parameter(NameResolverNameServerPort.class) final int serverPort, http://git-wip-us.apache.org/repos/asf/reef/blob/1a329b4c/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index 053e4cf..eade716 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -61,11 +61,9 @@ public final class NameServerImpl implements NameServer { * @param factory an identifier factory * @param localAddressProvider a local address provider * Constructs a name server - * @deprecated in 0.12. Use Tang to obtain an instance of this or, better, NameServer, instead. */ - @Deprecated @Inject - public NameServerImpl( + private NameServerImpl( @Parameter(NameServerParameters.NameServerPort.class) final int port, @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory, final LocalAddressProvider localAddressProvider) { http://git-wip-us.apache.org/repos/asf/reef/blob/1a329b4c/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java index 24e899e..373fd31 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java @@ -21,6 +21,7 @@ package org.apache.reef.io.network; import org.apache.commons.lang3.StringUtils; import org.apache.reef.exception.evaluator.NetworkException; import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.impl.NetworkServiceParameters; import org.apache.reef.io.network.naming.NameResolver; import org.apache.reef.io.network.naming.NameResolverConfiguration; import org.apache.reef.io.network.naming.NameServer; @@ -97,13 +98,24 @@ public class NetworkServiceTest { LOG.log(Level.FINEST, "=== Test network service receiver start"); LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - final NetworkService<String> ns2 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - final NetworkService<String> ns1 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class)) { + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory); + injector2.bindVolatileInstance(NameResolver.class, nameResolver); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec()); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, + injector.getInstance(MessagingTransportFactory.class)); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new ExceptionHandler()); + + final Injector injectorNs2 = injector2.forkInjector(); + injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name2, monitor, numMessages)); + final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class); + + final Injector injectorNs1 = injector2.forkInjector(); + injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name1, null, 0)); + final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class); ns2.registerId(factory.getNewInstance(name2)); final int port2 = ns2.getTransport().getListeningPort(); @@ -166,13 +178,24 @@ public class NetworkServiceTest { LOG.log(Level.FINEST, "=== Test network service receiver start"); LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class)) { + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory); + injector2.bindVolatileInstance(NameResolver.class, nameResolver); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec()); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, + injector.getInstance(MessagingTransportFactory.class)); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new ExceptionHandler()); + + final Injector injectorNs2 = injector2.forkInjector(); + injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name2, monitor, numMessages)); + final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class); + + final Injector injectorNs1 = injector2.forkInjector(); + injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name1, null, 0)); + final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class); ns2.registerId(factory.getNewInstance(name2)); final int port2 = ns2.getTransport().getListeningPort(); @@ -252,14 +275,24 @@ public class NetworkServiceTest { LOG.log(Level.FINEST, "=== Test network service receiver start"); LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name2, monitor, numMessages), - new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + try (final NameResolver nameResolver = injector.getInstance(NameResolver.class)) { + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory); + injector.bindVolatileInstance(NameResolver.class, nameResolver); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec()); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, + injector.getInstance(MessagingTransportFactory.class)); + injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new ExceptionHandler()); + + final Injector injectorNs2 = injector.forkInjector(); + injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name2, monitor, numMessages)); + final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class); + + final Injector injectorNs1 = injector.forkInjector(); + injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name1, null, 0)); + final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class); ns2.registerId(factory.getNewInstance(name2)); final int port2 = ns2.getTransport().getListeningPort(); @@ -342,14 +375,24 @@ public class NetworkServiceTest { LOG.log(Level.FINEST, "=== Test network service receiver start"); LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name2, monitor, totalNumMessages), - new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class)) { + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory); + injector2.bindVolatileInstance(NameResolver.class, nameResolver); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec()); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, + injector.getInstance(MessagingTransportFactory.class)); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new ExceptionHandler()); + + final Injector injectorNs2 = injector2.forkInjector(); + injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name2, monitor, numMessages)); + final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class); + + final Injector injectorNs1 = injector2.forkInjector(); + injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name1, null, 0)); + final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class); ns2.registerId(factory.getNewInstance(name2)); final int port2 = ns2.getTransport().getListeningPort(); @@ -433,13 +476,24 @@ public class NetworkServiceTest { LOG.log(Level.FINEST, "=== Test network service receiver start"); LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<>(factory, 0, nameResolver, - new StringCodec(), injector.getInstance(MessagingTransportFactory.class), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class)) { + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory); + injector2.bindVolatileInstance(NameResolver.class, nameResolver); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec()); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class, + injector.getInstance(MessagingTransportFactory.class)); + injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class, + new ExceptionHandler()); + + final Injector injectorNs2 = injector2.forkInjector(); + injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name2, monitor, numMessages)); + final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class); + + final Injector injectorNs1 = injector2.forkInjector(); + injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class, + new MessageHandler<String>(name1, null, 0)); + final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class); ns2.registerId(factory.getNewInstance(name2)); final int port2 = ns2.getTransport().getListeningPort();
