Repository: incubator-reef Updated Branches: refs/heads/master 8c4456415 -> 0f632ca86
[REEF-303] Clean up the construction of Wake Transport implementation This adds TransportFactory that returns a new Transport instance instantiated by Tang and cleans up deprecated methods/annotations. JIRA: [REEF-303] https://issues.apache.org/jira/browse/REEF-303 Pull Request: Closes #195 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0f632ca8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0f632ca8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0f632ca8 Branch: refs/heads/master Commit: 0f632ca864c8356e22981f598d31c89cfd7c4a94 Parents: 8c44564 Author: taegeonum <[email protected]> Authored: Thu May 28 19:06:33 2015 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Sun May 31 01:36:54 2015 +0900 ---------------------------------------------------------------------- .../apache/reef/examples/suspend/Control.java | 9 +- .../reef/examples/suspend/SuspendClient.java | 12 +- .../examples/suspend/SuspendClientControl.java | 8 +- .../reef/io/network/TransportFactory.java | 43 ------- .../group/impl/driver/GroupCommDriverImpl.java | 25 +++- .../network/impl/MessagingTransportFactory.java | 77 ------------ .../reef/io/network/impl/NetworkService.java | 25 ++-- .../network/impl/NetworkServiceParameters.java | 3 +- .../reef/io/network/naming/NameClient.java | 30 ++++- .../io/network/naming/NameLookupClient.java | 34 ++++- .../io/network/naming/NameRegistryClient.java | 18 ++- .../reef/io/network/naming/NameServerImpl.java | 54 ++++++-- .../services/network/NetworkServiceTest.java | 3 +- .../reef/wake/remote/RemoteConfiguration.java | 13 ++ .../impl/DefaultRemoteManagerFactory.java | 17 +-- .../DefaultRemoteManagerImplementation.java | 17 ++- .../remote/impl/DefaultTransportEStage.java | 38 ++++++ .../wake/remote/transport/TransportFactory.java | 86 +++++++++++++ .../netty/MessagingTransportFactory.java | 124 +++++++++++++++++++ .../netty/NettyMessagingTransport.java | 60 +++++++-- .../reef/wake/test/remote/LargeMsgTest.java | 12 +- .../reef/wake/test/remote/RemoteTest.java | 24 ++-- .../wake/test/remote/SmallMessagesTest.java | 15 ++- .../wake/test/remote/TransportRaceTest.java | 16 ++- .../reef/wake/test/remote/TransportTest.java | 17 ++- 25 files changed, 549 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java index db312ce..c5cf879 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java @@ -35,7 +35,7 @@ import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.TransportFactory; import javax.inject.Inject; import java.io.IOException; @@ -49,14 +49,17 @@ public final class Control { private final transient String command; private final transient String taskId; private final transient int port; + private final TransportFactory tpFactory; @Inject public Control(@Parameter(SuspendClientControl.Port.class) final int port, @Parameter(TaskId.class) final String taskId, - @Parameter(Command.class) final String command) { + @Parameter(Command.class) final String command, + final TransportFactory tpFactory) { this.command = command.trim().toLowerCase(); this.taskId = taskId; this.port = port; + this.tpFactory = tpFactory; } private static Configuration getConfig(final String[] args) throws IOException, BindException { @@ -87,7 +90,7 @@ public final class Control { } }); - try (final Transport transport = new NettyMessagingTransport("localhost", 0, stage, stage, 1, 10000)) { + try (final Transport transport = tpFactory.newInstance("localhost", 0, stage, stage, 1, 10000)) { final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null); link.write(this.command + " " + this.taskId); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java index e57e9b5..f7dbb59 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java @@ -57,15 +57,15 @@ public class SuspendClient { private final SuspendClientControl controlListener; /** - * @param reef reference to the REEF framework. - * @param port port to listen to for suspend/resume commands. - * @param numCycles number of cycles to run in the task. - * @param delay delay in seconds between cycles in the task. + * @param reef reference to the REEF framework. + * @param controlListener suspend client control listener. + * @param numCycles number of cycles to run in the task. + * @param delay delay in seconds between cycles in the task. */ @Inject SuspendClient( final REEF reef, - final @Parameter(SuspendClientControl.Port.class) int port, + final SuspendClientControl controlListener, final @Parameter(Launch.NumCycles.class) int numCycles, final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException { @@ -89,7 +89,7 @@ public class SuspendClient { this.driverConfig = cb.build(); this.reef = reef; - this.controlListener = new SuspendClientControl(port); + this.controlListener = controlListener; } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java index accb5a8..7f5e67f 100644 --- a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java +++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java @@ -28,7 +28,7 @@ import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.TransportFactory; import javax.inject.Inject; import java.io.IOException; @@ -45,9 +45,11 @@ public class SuspendClientControl implements AutoCloseable { private final transient Transport transport; private transient RunningJob runningJob; + @Inject public SuspendClientControl( - final @Parameter(SuspendClientControl.Port.class) int port) throws IOException { + final @Parameter(SuspendClientControl.Port.class) int port, + final TransportFactory tpFactory) throws IOException { LOG.log(Level.INFO, "Listen to control port {0}", port); @@ -59,7 +61,7 @@ public class SuspendClientControl implements AutoCloseable { } }); - this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000); + this.transport = tpFactory.newInstance("localhost", port, stage, stage, 1, 10000); } public synchronized void setRunningJob(final RunningJob job) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java deleted file mode 100644 index 1443a46..0000000 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.io.network; - -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Transport; - -/** - * Factory that creates a transport - */ -public interface TransportFactory { - - /** - * Creates a transport - * - * @param port a listening port - * @param clientHandler a transport client-side handler - * @param serverHandler a transport server-side handler - * @param exHandler an exception handler - * @return - */ - public Transport create(int port, - EventHandler<TransportEvent> clientHandler, - EventHandler<TransportEvent> serverHandler, - EventHandler<Exception> exHandler); -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java index adcb1e5..e453640 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java @@ -26,10 +26,6 @@ import org.apache.reef.driver.parameters.DriverIdentifier; import org.apache.reef.driver.task.FailedTask; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.impl.*; -import org.apache.reef.io.network.naming.NameServer; -import org.apache.reef.io.network.naming.NameServerImpl; -import org.apache.reef.io.network.naming.NameServerParameters; import org.apache.reef.io.network.group.api.driver.CommunicationGroupDriver; import org.apache.reef.io.network.group.api.driver.GroupCommServiceDriver; import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; @@ -39,6 +35,10 @@ import org.apache.reef.io.network.group.impl.config.parameters.TreeTopologyFanOu import org.apache.reef.io.network.group.impl.task.GroupCommNetworkHandlerImpl; import org.apache.reef.io.network.group.impl.utils.BroadcastingEventHandler; import org.apache.reef.io.network.group.impl.utils.Utils; +import org.apache.reef.io.network.impl.*; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.io.network.naming.NameServerImpl; +import org.apache.reef.io.network.naming.NameServerParameters; import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.JavaConfigurationBuilder; @@ -56,6 +56,8 @@ import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.apache.reef.wake.remote.transport.TransportFactory; import javax.inject.Inject; import java.util.HashMap; @@ -124,6 +126,19 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { @Parameter(DriverIdentifier.class) final String driverId, @Parameter(TreeTopologyFanOut.class) final int fanOut, final LocalAddressProvider localAddressProvider) { + this(confSerializer, driverId, fanOut, localAddressProvider, new MessagingTransportFactory()); + } + + /** + * @deprecated Have an instance injected instead. + */ + @Deprecated + @Inject + public GroupCommDriverImpl(final ConfigurationSerializer confSerializer, + @Parameter(DriverIdentifier.class) final String driverId, + @Parameter(TreeTopologyFanOut.class) final int fanOut, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory) { assert (SingletonAsserter.assertSingleton(getClass())); this.driverId = driverId; this.fanOut = fanOut; @@ -141,7 +156,7 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { this.groupCommMessageHandler = new GroupCommMessageHandler(); this.groupCommMessageStage = new SingleThreadStage<>("GroupCommMessageStage", groupCommMessageHandler, 100 * 1000); this.netService = new NetworkService<>(idFac, 0, nameServiceAddr, nameServicePort, - new GroupCommunicationMessageCodec(), new MessagingTransportFactory(localAddressProvider), + new GroupCommunicationMessageCodec(), tpFactory, new EventHandler<Message<GroupCommunicationMessage>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java deleted file mode 100644 index f9391bd..0000000 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.io.network.impl; - -import org.apache.reef.io.network.TransportFactory; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.SyncStage; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; - -import javax.inject.Inject; - -/** - * Factory that creates a messaging transport - */ -public class MessagingTransportFactory implements TransportFactory { - - private final String localAddress; - - /** - * @deprecated Have an instance injected instead. - */ - @Deprecated - @Inject - public MessagingTransportFactory(final LocalAddressProvider localAddressProvider) { - this.localAddress = localAddressProvider.getLocalAddress(); - } - - /** - * @deprecated Have an instance injected instead. - */ - @Deprecated - public MessagingTransportFactory() { - this.localAddress = LocalAddressProviderFactory.getInstance().getLocalAddress(); - } - - /** - * Creates a transport - * - * @param port a listening port - * @param clientHandler a transport client side handler - * @param serverHandler a transport server side handler - * @param exHandler a exception handler - */ - @Override - public Transport create(final int port, - final EventHandler<TransportEvent> clientHandler, - final EventHandler<TransportEvent> serverHandler, - final EventHandler<Exception> exHandler) { - - final Transport transport = new NettyMessagingTransport(this.localAddress, - port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000, RangeTcpPortProvider.Default); - - transport.registerErrorHandler(exHandler); - return transport; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java index 269fbf3..3709f43 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java @@ -23,7 +23,7 @@ import org.apache.reef.io.naming.Naming; import org.apache.reef.io.network.Connection; import org.apache.reef.io.network.ConnectionFactory; import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.TransportFactory; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.io.network.naming.NameCache; import org.apache.reef.io.network.naming.NameClient; import org.apache.reef.io.network.naming.NameLookupClient; @@ -125,18 +125,17 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { * @deprecated have an instance injected instead. */ @Deprecated - @Inject public NetworkService( - final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory factory, - final @Parameter(NetworkServiceParameters.NetworkServicePort.class) int nsPort, - final @Parameter(NameServerParameters.NameServerAddr.class) String nameServerAddr, - final @Parameter(NameServerParameters.NameServerPort.class) int nameServerPort, - final @Parameter(NameLookupClient.RetryCount.class) int retryCount, - final @Parameter(NameLookupClient.RetryTimeout.class) int retryTimeout, - final @Parameter(NetworkServiceParameters.NetworkServiceCodec.class) Codec<T> codec, - final @Parameter(NetworkServiceParameters.NetworkServiceTransportFactory.class) TransportFactory tpFactory, - final @Parameter(NetworkServiceParameters.NetworkServiceHandler.class) EventHandler<Message<T>> recvHandler, - final @Parameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class) EventHandler<Exception> exHandler) { + final IdentifierFactory factory, + final int nsPort, + final String nameServerAddr, + final int nameServerPort, + final int retryCount, + final int retryTimeout, + final Codec<T> codec, + final TransportFactory tpFactory, + final EventHandler<Message<T>> recvHandler, + final EventHandler<Exception> exHandler) { this(factory, nsPort, nameServerAddr, nameServerPort, retryCount, retryTimeout, codec, tpFactory, recvHandler, exHandler, LocalAddressProviderFactory.getInstance()); } @@ -161,7 +160,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { this.factory = factory; this.codec = codec; - this.transport = tpFactory.create(nsPort, + this.transport = tpFactory.newInstance(nsPort, new LoggingEventHandler<TransportEvent>(), new MessageHandler<T>(recvHandler, codec, factory), exHandler); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java index a31c5de..dd5e8a1 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkServiceParameters.java @@ -18,7 +18,8 @@ */ package org.apache.reef.io.network.impl; -import org.apache.reef.io.network.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java index 52688aa..3efef65 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameClient.java @@ -19,11 +19,11 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.Naming; -import org.apache.reef.util.cache.Cache; import org.apache.reef.io.network.naming.exception.NamingRuntimeException; import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; import org.apache.reef.io.network.naming.serialization.NamingMessage; import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; +import org.apache.reef.util.cache.Cache; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; @@ -34,7 +34,8 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.apache.reef.wake.remote.transport.TransportFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -117,12 +118,35 @@ public class NameClient implements Stage, Naming { final int retryTimeout, final Cache<Identifier, InetSocketAddress> cache, final LocalAddressProvider localAddressProvider) { + this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout, + cache, localAddressProvider, new MessagingTransportFactory()); + } + + /** + * Constructs a naming client + * + * @param serverAddr a server address + * @param serverPort a server port number + * @param timeout timeout in ms + * @param factory an identifier factory + * @param cache a cache + * @param tpFactory transport factory + */ + public NameClient(final String serverAddr, + final int serverPort, + final long timeout, + final IdentifierFactory factory, + final int retryCount, + final int retryTimeout, + final Cache<Identifier, InetSocketAddress> cache, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory) { final BlockingQueue<NamingLookupResponse> replyLookupQueue = new LinkedBlockingQueue<NamingLookupResponse>(); final BlockingQueue<NamingRegisterResponse> replyRegisterQueue = new LinkedBlockingQueue<NamingRegisterResponse>(); final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); - this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0, + this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0, new SyncStage<>(new NamingClientEventHandler( new NamingResponseHandler(replyLookupQueue, replyRegisterQueue), codec)), null, retryCount, retryTimeout); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java index a402ebb..fdef6f0 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameLookupClient.java @@ -20,13 +20,13 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.NameAssignment; import org.apache.reef.io.naming.NamingLookup; -import org.apache.reef.util.cache.Cache; import org.apache.reef.io.network.naming.exception.NamingException; import org.apache.reef.io.network.naming.serialization.NamingLookupRequest; import org.apache.reef.io.network.naming.serialization.NamingLookupResponse; import org.apache.reef.io.network.naming.serialization.NamingMessage; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.util.cache.Cache; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; @@ -39,7 +39,8 @@ import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.apache.reef.wake.remote.transport.TransportFactory; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -93,7 +94,6 @@ public class NameLookupClient implements Stage, NamingLookup { * @param factory an identifier factory * @param cache an cache */ - @Deprecated public NameLookupClient(final String serverAddr, final int serverPort, final IdentifierFactory factory, @@ -139,6 +139,29 @@ public class NameLookupClient implements Stage, NamingLookup { final int retryTimeout, final Cache<Identifier, InetSocketAddress> cache, final LocalAddressProvider localAddressProvider) { + this(serverAddr, serverPort, timeout, factory, retryCount, retryTimeout, + cache, localAddressProvider, new MessagingTransportFactory()); + } + + /** + * Constructs a naming lookup client + * + * @param serverAddr a server address + * @param serverPort a server port number + * @param timeout request timeout in ms + * @param factory an identifier factory + * @param cache an cache + * @param tpFactory a transport factory + */ + public NameLookupClient(final String serverAddr, + final int serverPort, + final long timeout, + final IdentifierFactory factory, + final int retryCount, + final int retryTimeout, + final Cache<Identifier, InetSocketAddress> cache, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory) { this.serverSocketAddr = new InetSocketAddress(serverAddr, serverPort); this.timeout = timeout; @@ -146,7 +169,7 @@ public class NameLookupClient implements Stage, NamingLookup { this.codec = NamingCodecFactory.createLookupCodec(factory); this.replyQueue = new LinkedBlockingQueue<>(); - this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0, + this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), 0, new SyncStage<>(new NamingLookupClientHandler( new NamingLookupResponseHandler(this.replyQueue), this.codec)), null, retryCount, retryTimeout); @@ -154,7 +177,6 @@ public class NameLookupClient implements Stage, NamingLookup { this.retryCount = retryCount; this.retryTimeout = retryTimeout; } - NameLookupClient(final String serverAddr, final int serverPort, final long timeout, final IdentifierFactory factory, final int retryCount, final int retryTimeout, final BlockingQueue<NamingLookupResponse> replyQueue, final Transport transport, @@ -170,6 +192,8 @@ public class NameLookupClient implements Stage, NamingLookup { this.retryTimeout = retryTimeout; } + + /** * Finds an address for an identifier * http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java index ba48ed7..24056dd 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java @@ -24,12 +24,16 @@ import org.apache.reef.io.network.naming.serialization.NamingMessage; import org.apache.reef.io.network.naming.serialization.NamingRegisterRequest; import org.apache.reef.io.network.naming.serialization.NamingRegisterResponse; import org.apache.reef.io.network.naming.serialization.NamingUnregisterRequest; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.Stage; import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.RemoteConfiguration; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; @@ -90,9 +94,17 @@ public class NameRegistryClient implements Stage, NamingRegistry { this.timeout = timeout; this.codec = NamingCodecFactory.createRegistryCodec(factory); this.replyQueue = new LinkedBlockingQueue<>(); - this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), 0, - new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec)), - null, 3, 10000); + + Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); + injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, + new SyncStage<>(new NamingRegistryClientHandler(new NamingRegistryResponseHandler(replyQueue), codec))); + + try { + this.transport = injector.getInstance(NettyMessagingTransport.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } } @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index c541c73..77c012a 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -20,19 +20,23 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.NameAssignment; import org.apache.reef.io.network.naming.serialization.*; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.impl.MultiEventHandler; import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.RemoteConfiguration; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; -import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; import org.apache.reef.webserver.AvroReefServiceInfo; import org.apache.reef.webserver.ReefEventStateManager; @@ -69,13 +73,22 @@ public class NameServerImpl implements NameServer { final IdentifierFactory factory, final LocalAddressProvider localAddressProvider) { + Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = localAddressProvider; this.reefEventStateManager = null; final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); final EventHandler<NamingMessage> handler = createEventHandler(codec); - this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), port, null, - new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000); + injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress()); + injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); + injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(new NamingServerHandler(handler, codec))); + + try { + this.transport = injector.getInstance(NettyMessagingTransport.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } this.port = transport.getListeningPort(); this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); @@ -100,15 +113,36 @@ public class NameServerImpl implements NameServer { final int port, final IdentifierFactory factory, final ReefEventStateManager reefEventStateManager) { - this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance(), RangeTcpPortProvider.Default); + this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance()); + } + + /** + * Constructs a name server + * + * @param port a listening port number + * @param factory an identifier factory + * @param reefEventStateManager the event state manager used to register name server info + * @param localAddressProvider a local address provider + * @deprecated have an instance injected instead + */ + @Deprecated + public NameServerImpl( + final int port, + final IdentifierFactory factory, + final ReefEventStateManager reefEventStateManager, + final LocalAddressProvider localAddressProvider) { + this(port, factory, reefEventStateManager, localAddressProvider, new MessagingTransportFactory()); } + /** * Constructs a name server * * @param port a listening port number * @param factory an identifier factory * @param reefEventStateManager the event state manager used to register name server info + * @param localAddressProvider a local address provider + * @param tpFactory a transport factory */ @Inject public NameServerImpl( @@ -116,16 +150,14 @@ public class NameServerImpl implements NameServer { final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory, final ReefEventStateManager reefEventStateManager, final LocalAddressProvider localAddressProvider, - final TcpPortProvider tcpPortProvider) { - + final TransportFactory tpFactory) { this.localAddressProvider = localAddressProvider; - this.reefEventStateManager = reefEventStateManager; final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory); final EventHandler<NamingMessage> handler = createEventHandler(codec); - this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), port, null, - new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000, tcpPortProvider); + this.transport = tpFactory.newInstance(localAddressProvider.getLocalAddress(), port, null, + new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000); this.port = transport.getListeningPort(); this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); @@ -138,6 +170,7 @@ public class NameServerImpl implements NameServer { LOG.log(Level.FINE, "NameServer starting, listening at port {0}", this.port); } + private EventHandler<NamingMessage> createEventHandler(final Codec<NamingMessage> codec) { final Map<Class<? extends NamingMessage>, EventHandler<? extends NamingMessage>> @@ -150,7 +183,6 @@ public class NameServerImpl implements NameServer { return handler; } - /** * Gets port */ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java index f68ef5a..9976026 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java @@ -21,14 +21,13 @@ package org.apache.reef.services.network; import org.apache.reef.exception.evaluator.NetworkException; import org.apache.reef.io.network.Connection; import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.impl.MessagingTransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.io.network.impl.NetworkService; import org.apache.reef.io.network.naming.NameServer; import org.apache.reef.io.network.naming.NameServerImpl; import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.services.network.util.Monitor; import org.apache.reef.services.network.util.StringCodec; -import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java index f438239..0f433d4 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java @@ -20,8 +20,11 @@ package org.apache.reef.wake.remote; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.impl.DefaultTransportEStage; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.apache.reef.wake.remote.impl.TransportEvent; /** * Configuration options and helper methods for Wake remoting. @@ -68,4 +71,14 @@ public final class RemoteConfiguration { public static final class RetryTimeout implements Name<Integer> { // Intentionally empty } + + @NamedParameter(doc = "Client stage for messaging transport", default_class = DefaultTransportEStage.class) + public static final class RemoteClientStage implements Name<EStage<TransportEvent>> { + // Intentionally empty + } + + @NamedParameter(doc = "Server stage for messaging transport", default_class = DefaultTransportEStage.class) + public static final class RemoteServerStage implements Name<EStage<TransportEvent>> { + // Intentionally empty + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java index e36b4dc..bb11751 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java @@ -26,6 +26,7 @@ import org.apache.reef.wake.remote.RemoteManager; import org.apache.reef.wake.remote.RemoteManagerFactory; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.TransportFactory; import javax.inject.Inject; @@ -40,7 +41,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { private final int numberOfTries; private final int retryTimeout; private final LocalAddressProvider localAddressProvider; - private final TcpPortProvider tcpPortProvider; + private final TransportFactory tpFactory; @Inject private DefaultRemoteManagerFactory( @@ -50,14 +51,14 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, final LocalAddressProvider localAddressProvider, - final TcpPortProvider tcpPortProvider) { + final TransportFactory tpFactory) { this.codec = codec; this.errorHandler = errorHandler; this.orderingGuarantee = orderingGuarantee; this.numberOfTries = numberOfTries; this.retryTimeout = retryTimeout; this.localAddressProvider = localAddressProvider; - this.tcpPortProvider = tcpPortProvider; + this.tpFactory = tpFactory; } @Override @@ -71,7 +72,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { this.numberOfTries, this.retryTimeout, this.localAddressProvider, - this.tcpPortProvider); + this.tpFactory); } @@ -95,7 +96,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { numberOfTries, retryTimeout, localAddressProvider, - tcpPortProvider); + tpFactory); } @Override @@ -116,7 +117,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { numberOfTries, retryTimeout, this.localAddressProvider, - this.tcpPortProvider); + this.tpFactory); } @@ -131,7 +132,7 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { this.numberOfTries, this.retryTimeout, this.localAddressProvider, - this.tcpPortProvider); + this.tpFactory); } @Override @@ -148,6 +149,6 @@ public class DefaultRemoteManagerFactory implements RemoteManagerFactory { this.numberOfTries, this.retryTimeout, this.localAddressProvider, - this.tcpPortProvider); + this.tpFactory); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index c53d063..7fbe59e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -25,9 +25,9 @@ import org.apache.reef.wake.impl.StageManager; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; -import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; import javax.inject.Inject; @@ -64,8 +64,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { /** * Indicates a hostname that isn't set or known. */ - public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##"; - + public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME; /** * @deprecated have an instance injected instead. Or use RemoteManagerFactory.getInstance() @@ -89,8 +88,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { numberOfTries, retryTimeout, LocalAddressProviderFactory.getInstance(), - RangeTcpPortProvider.Default); - + new MessagingTransportFactory()); } /** @@ -108,7 +106,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, final LocalAddressProvider localAddressProvider, - final TcpPortProvider tcpPortProvider) { + final TransportFactory tpFactory) { this.name = name; this.handlerContainer = new HandlerContainer<>(name, codec); @@ -117,9 +115,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); - final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; - this.transport = new NettyMessagingTransport( - host, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); + this.transport = tpFactory.newInstance( + hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); this.handlerContainer.setTransport(this.transport); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java new file mode 100644 index 0000000..8ab669e --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.impl; + +import org.apache.reef.wake.EStage; + +import javax.inject.Inject; + +public class DefaultTransportEStage implements EStage<TransportEvent> { + + @Inject + public DefaultTransportEStage() { + } + + @Override + public void onNext(TransportEvent value) { + } + + @Override + public void close() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java new file mode 100644 index 0000000..a09339b --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/TransportFactory.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport; + +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; + +/** + * Factory that creates a transport + */ +@DefaultImplementation(MessagingTransportFactory.class) +public interface TransportFactory { + + /** + * Creates a transport + * + * @param port a listening port + * @param clientHandler a transport client-side handler + * @param serverHandler a transport server-side handler + * @param exHandler an exception handler + * @return transport + */ + public Transport newInstance(int port, + EventHandler<TransportEvent> clientHandler, + EventHandler<TransportEvent> serverHandler, + EventHandler<Exception> exHandler); + + /** + * Creates a transport + * + * @param hostAddress a host address + * @param port a listening port + * @param clientStage a transport client-side stage + * @param serverStage a transport server-side stage + * @param numberOfTries the number of retries for connection + * @param retryTimeout retry timeout + * @return transport + */ + public Transport newInstance(final String hostAddress, int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout); + + /** + * Creates a transport + * + * @param hostAddress a host address + * @param port a listening port + * @param clientStage a transport client-side stage + * @param serverStage a transport server-side stage + * @param numberOfTries the number of retries for connection + * @param retryTimeout retry timeout + * @param tcpPortProvider tcpPortProvider + * @return transport + */ + public Transport newInstance(final String hostAddress, + int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout, + final TcpPortProvider tcpPortProvider); + + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java new file mode 100644 index 0000000..b77f804 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.transport.netty; + +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SyncStage; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; + +/** + * Factory that creates a messaging transport + */ +public class MessagingTransportFactory implements TransportFactory { + + private final String localAddress; + + /** + * @deprecated Have an instance injected instead. + */ + @Deprecated + @Inject + public MessagingTransportFactory(final LocalAddressProvider localAddressProvider) { + this.localAddress = localAddressProvider.getLocalAddress(); + } + + /** + * @deprecated Have an instance injected instead. + */ + @Deprecated + public MessagingTransportFactory() { + this.localAddress = LocalAddressProviderFactory.getInstance().getLocalAddress(); + } + + /** + * Creates a transport + * + * @param port a listening port + * @param clientHandler a transport client side handler + * @param serverHandler a transport server side handler + * @param exHandler a exception handler + */ + @Override + public Transport newInstance(final int port, + final EventHandler<TransportEvent> clientHandler, + final EventHandler<TransportEvent> serverHandler, + final EventHandler<Exception> exHandler) { + + Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress); + injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); + injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler)); + injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, new SyncStage<>(serverHandler)); + + final Transport transport; + try { + transport = injector.getInstance(NettyMessagingTransport.class); + transport.registerErrorHandler(exHandler); + return transport; + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + @Override + public Transport newInstance(final String hostAddress, int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout) { + return newInstance(hostAddress, port, clientStage, + serverStage, numberOfTries, retryTimeout, RangeTcpPortProvider.Default); + } + + @Override + public Transport newInstance(final String hostAddress, int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout, + final TcpPortProvider tcpPortProvider) { + + Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); + injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); + injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage); + injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class, serverStage); + injector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries); + injector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout); + injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); + try { + return injector.getInstance(NettyMessagingTransport.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index a1f0886..dff38bd 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -30,10 +30,14 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.DefaultThreadFactory; import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.exception.RemoteRuntimeException; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; @@ -43,6 +47,7 @@ import org.apache.reef.wake.remote.transport.LinkListener; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; +import javax.inject.Inject; import java.io.IOException; import java.net.BindException; import java.net.ConnectException; @@ -88,6 +93,11 @@ public class NettyMessagingTransport implements Transport { private final int numberOfTries; private final int retryTimeout; + /** + * Indicates a hostname that isn't set or known. + */ + public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##"; + /** * Constructs a messaging transport @@ -99,19 +109,49 @@ public class NettyMessagingTransport implements Transport { * @param numberOfTries the number of tries of connection * @param retryTimeout the timeout of reconnection * @param tcpPortProvider gives an iterator that produces random tcp ports in a range + * @deprecated use the constructor that takes a LocalAddressProvider instead. + */ + @Deprecated + public NettyMessagingTransport( + final String hostAddress, + int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout, + final TcpPortProvider tcpPortProvider) { + + this(hostAddress, port, clientStage, serverStage, numberOfTries, + retryTimeout, tcpPortProvider, LocalAddressProviderFactory.getInstance()); + } + /** + * Constructs a messaging transport * + * @param hostAddress the server host address + * @param port the server listening port; when it is 0, randomly assign a port number + * @param clientStage the client-side stage that handles transport events + * @param serverStage the server-side stage that handles transport events + * @param numberOfTries the number of tries of connection + * @param retryTimeout the timeout of reconnection + * @param tcpPortProvider gives an iterator that produces random tcp ports in a range */ - public NettyMessagingTransport(final String hostAddress, int port, - final EStage<TransportEvent> clientStage, - final EStage<TransportEvent> serverStage, - final int numberOfTries, - final int retryTimeout, - final TcpPortProvider tcpPortProvider) { + @Inject + NettyMessagingTransport( + final @Parameter(RemoteConfiguration.HostAddress.class) String hostAddress, + @Parameter(RemoteConfiguration.Port.class) int port, + final @Parameter(RemoteConfiguration.RemoteClientStage.class) EStage<TransportEvent> clientStage, + final @Parameter(RemoteConfiguration.RemoteServerStage.class) EStage<TransportEvent> serverStage, + final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, + final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, + final TcpPortProvider tcpPortProvider, + final LocalAddressProvider localAddressProvider) { if (port < 0) { throw new RemoteRuntimeException("Invalid server port: " + port); } + final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; + this.numberOfTries = numberOfTries; this.retryTimeout = retryTimeout; this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage); @@ -143,7 +183,7 @@ public class NettyMessagingTransport implements Transport { Channel acceptor = null; try { if (port > 0) { - acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); + acceptor = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel(); } else { Iterator<Integer> ports = tcpPortProvider.iterator(); while (acceptor == null) { @@ -151,7 +191,7 @@ public class NettyMessagingTransport implements Transport { port = ports.next(); LOG.log(Level.FINEST, "Try port {0}", port); try { - acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); + acceptor = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel(); } catch (final Exception ex) { if (ex instanceof BindException) { LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); @@ -174,7 +214,7 @@ public class NettyMessagingTransport implements Transport { this.acceptor = acceptor; this.serverPort = port; - this.localAddress = new InetSocketAddress(hostAddress, this.serverPort); + this.localAddress = new InetSocketAddress(host, this.serverPort); LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress); } @@ -188,7 +228,7 @@ public class NettyMessagingTransport implements Transport { * @param serverStage the server-side stage that handles transport events * @param numberOfTries the number of tries of connection * @param retryTimeout the timeout of reconnection - * @deprecated use the constructor that takes a TcpProvider instead + * @deprecated use the constructor that takes a TcpProvider and LocalAddressProvider instead. */ @Deprecated public NettyMessagingTransport(final String hostAddress, int port, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java index 3defe01..c0c6a01 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java @@ -18,6 +18,7 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; @@ -27,10 +28,10 @@ import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.PassThroughEncoder; import org.apache.reef.wake.test.util.TimeoutHandler; @@ -46,13 +47,16 @@ import java.util.logging.Level; */ public class LargeMsgTest { private final LocalAddressProvider localAddressProvider; + private final TransportFactory tpFactory; private final static byte[][] values = new byte[3][]; private final static int l0 = 1 << 25; private final static int l1 = 1 << 2; private final static int l2 = 1 << 21; public LargeMsgTest() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.tpFactory = injector.getInstance(TransportFactory.class); } @BeforeClass @@ -87,7 +91,7 @@ public class LargeMsgTest { final String hostAddress = this.localAddressProvider.getLocalAddress(); int port = 7001; - NettyMessagingTransport transport = new NettyMessagingTransport(hostAddress, port, clientStage, serverStage, 1, 10000); + Transport transport = tpFactory.newInstance(hostAddress, port, clientStage, serverStage, 1, 10000); final Link<byte[]> link = transport.open(new InetSocketAddress(hostAddress, port), new PassThroughEncoder(), null); EStage<byte[]> writeSubmitter = new ThreadPoolStage<>("Submitter", new EventHandler<byte[]>() { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java index 636315f..35f879a 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java @@ -18,17 +18,22 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.MultiEventHandler; import org.apache.reef.wake.impl.TimerStage; -import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.Decoder; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.RemoteIdentifier; +import org.apache.reef.wake.remote.RemoteIdentifierFactory; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.*; import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -44,14 +49,17 @@ import java.util.logging.Level; public class RemoteTest { private final LocalAddressProvider localAddressProvider; + private final TransportFactory tpFactory; @Rule public final TestName name = new TestName(); final String logPrefix = "TEST "; - public RemoteTest() { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + public RemoteTest() throws InjectionException { + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.tpFactory = injector.getInstance(TransportFactory.class); } @Test @@ -87,10 +95,10 @@ public class RemoteTest { final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport - Transport transport1 = new NettyMessagingTransport(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000); + Transport transport1 = tpFactory.newInstance(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000); int port1 = transport1.getListeningPort(); - Transport transport2 = new NettyMessagingTransport(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000); + Transport transport2 = tpFactory.newInstance(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000); int port2 = transport2.getListeningPort(); transport1.close(); @@ -132,7 +140,7 @@ public class RemoteTest { final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport - Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); + Transport transport = tpFactory.newInstance(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); // mux encoder with encoder map Map<Class<?>, Encoder<?>> clazzToEncoderMap = new HashMap<Class<?>, Encoder<?>>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java index fdced04..6e93d67 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java @@ -18,6 +18,9 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.MultiEventHandler; @@ -27,10 +30,9 @@ import org.apache.reef.wake.remote.Encoder; import org.apache.reef.wake.remote.RemoteIdentifier; import org.apache.reef.wake.remote.RemoteIdentifierFactory; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.*; import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -43,9 +45,12 @@ import java.util.logging.Level; public class SmallMessagesTest { private final LocalAddressProvider localAddressProvider; + private final TransportFactory tpFactory; - public SmallMessagesTest() { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + public SmallMessagesTest() throws InjectionException { + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.tpFactory = injector.getInstance(TransportFactory.class); } @Rule @@ -87,7 +92,7 @@ public class SmallMessagesTest { final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport - Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); + Transport transport = tpFactory.newInstance(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); // mux encoder with encoder map Map<Class<?>, Encoder<?>> clazzToEncoderMap = new HashMap<Class<?>, Encoder<?>>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java index bdb480d..b8fd236 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java @@ -18,6 +18,9 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; @@ -25,10 +28,10 @@ import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.PassThroughEncoder; import org.apache.reef.wake.test.util.TimeoutHandler; @@ -41,9 +44,12 @@ import java.util.logging.Level; public class TransportRaceTest { private final LocalAddressProvider localAddressProvider; + private final TransportFactory tpFactory; - public TransportRaceTest() { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + public TransportRaceTest() throws InjectionException { + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.tpFactory = injector.getInstance(TransportFactory.class); } @Test @@ -60,7 +66,7 @@ public class TransportRaceTest { serverHandler, 1, new LoggingEventHandler<Throwable>()); final String hostAddress = this.localAddressProvider.getLocalAddress(); int port = 7001; - NettyMessagingTransport transport = new NettyMessagingTransport( + Transport transport = tpFactory.newInstance( hostAddress, port, clientStage, serverStage, 1, 10000); String value = "Test Race"; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0f632ca8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java index 8176a70..3ad767a 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java @@ -18,18 +18,20 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; -import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; +import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -44,9 +46,12 @@ import java.util.logging.Level; public class TransportTest { private final LocalAddressProvider localAddressProvider; + private final TransportFactory tpFactory; - public TransportTest() { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + public TransportTest() throws InjectionException { + final Injector injector = Tang.Factory.getTang().newInjector(); + this.localAddressProvider = injector.getInstance(LocalAddressProvider.class); + this.tpFactory = injector.getInstance(TransportFactory.class); } final String logPrefix = "TEST "; @@ -68,7 +73,7 @@ public class TransportTest { // Codec<String> ReceiverStage<String> stage = new ReceiverStage<String>(new ObjectSerializableCodec<String>(), monitor, expected); - Transport transport = new NettyMessagingTransport(hostAddress, port, stage, stage, 1, 10000); + Transport transport = tpFactory.newInstance(hostAddress, port, stage, stage, 1, 10000); // sending side Link<String> link = transport.open( @@ -99,7 +104,7 @@ public class TransportTest { // Codec<TestEvent> ReceiverStage<TestEvent> stage = new ReceiverStage<TestEvent>(new ObjectSerializableCodec<TestEvent>(), monitor, expected); - Transport transport = new NettyMessagingTransport(hostAddress, port, stage, stage, 1, 10000); + Transport transport = tpFactory.newInstance(hostAddress, port, stage, stage, 1, 10000); // sending side Link<TestEvent> link = transport.open(
