Repository: incubator-reef Updated Branches: refs/heads/master ac380af06 -> a72a40f8d
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/HostnameBasedLocalAddressProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/HostnameBasedLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/HostnameBasedLocalAddressProvider.java new file mode 100644 index 0000000..844cb8e --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/HostnameBasedLocalAddressProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.address; + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; + +import javax.inject.Inject; +import java.net.Inet4Address; +import java.net.UnknownHostException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A LocalAddressProvider that uses <code>Inet4Address.getLocalHost().getHostAddress()</code> + */ +public final class HostnameBasedLocalAddressProvider implements LocalAddressProvider { + private static final Logger LOG = Logger.getLogger(HostnameBasedLocalAddressProvider.class.getName()); + private String cached = null; + + /** + * The constructor is for Tang only. + */ + @Inject + private HostnameBasedLocalAddressProvider() { + LOG.log(Level.INFO, "Instantiating HostnameBasedLocalAddressProvider"); + } + + @Override + public synchronized String getLocalAddress() { + if (null == cached) { + try { + cached = Inet4Address.getLocalHost().getHostAddress(); + } catch (final UnknownHostException ex) { + final String message = "Unable to resolve LocalHost. This is fatal."; + LOG.log(Level.SEVERE, message, ex); + throw new RuntimeException(message, ex); + } + } + assert (null != cached); + return cached; + } + + @Override + public Configuration getConfiguration() { + return Tang.Factory.getTang().newConfigurationBuilder() + .bind(LocalAddressProvider.class, HostnameBasedLocalAddressProvider.class) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java new file mode 100644 index 0000000..7ab18fc --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.address; + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.wake.exception.WakeRuntimeException; + +import javax.inject.Inject; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Comparator; +import java.util.Enumeration; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An implementation of LocalAddressProvider using the (removed) code from NetUtils.getLocalAddress() + */ +public final class LegacyLocalAddressProvider implements LocalAddressProvider { + private static final Logger LOG = Logger.getLogger(LegacyLocalAddressProvider.class.getName()); + private final AtomicReference<String> cached = new AtomicReference<>(); + + /** + * Injectable constructor for Tang only. + */ + @Inject + private LegacyLocalAddressProvider() { + LOG.log(Level.INFO, "Instantiating LegacyLocalAddressProvider"); + } + + @Override + public String getLocalAddress() { + // This method is surprisingly slow: It was causing unit test timeouts, so we memoize the result. + if (cached.get() == null) { + Enumeration<NetworkInterface> ifaces; + try { + ifaces = NetworkInterface.getNetworkInterfaces(); + TreeSet<Inet4Address> sortedAddrs = new TreeSet<>(new AddressComparator()); + // There is an idea of virtual / subinterfaces exposed by java here. + // We're not walking around looking for those because the javadoc says: + + // "NOTE: can use getNetworkInterfaces()+getInetAddresses() to obtain all IP addresses for this node" + + while (ifaces.hasMoreElements()) { + NetworkInterface iface = ifaces.nextElement(); +// if(iface.isUp()) { // leads to slowness and non-deterministic return values, so don't call isUp(). + Enumeration<InetAddress> addrs = iface.getInetAddresses(); + while (addrs.hasMoreElements()) { + InetAddress a = addrs.nextElement(); + if (a instanceof Inet4Address) { + sortedAddrs.add((Inet4Address) a); + } +// } + } + } + if (sortedAddrs.isEmpty()) { + throw new WakeRuntimeException("This machine apparently doesn't have any IP addresses (not even 127.0.0.1) on interfaces that are up."); + } + cached.set(sortedAddrs.pollFirst().getHostAddress()); + LOG.log(Level.FINE, "Local address is {0}", cached.get()); + } catch (SocketException e) { + throw new WakeRuntimeException("Unable to get local host address", + e.getCause()); + } + } + return cached.get(); + } + + @Override + public Configuration getConfiguration() { + return Tang.Factory.getTang().newConfigurationBuilder() + .bind(LocalAddressProvider.class, LegacyLocalAddressProvider.class) + .build(); + } + + private static class AddressComparator implements Comparator<Inet4Address> { + + // get unsigned byte. + private static int u(byte b) { + return ((int) b);// & 0xff; + } + + @Override + public int compare(Inet4Address aa, Inet4Address ba) { + byte[] a = aa.getAddress(); + byte[] b = ba.getAddress(); + // local subnet comes after all else. + if (a[0] == 127 && b[0] != 127) { + return 1; + } + if (a[0] != 127 && b[0] == 127) { + return -1; + } + for (int i = 0; i < 4; i++) { + if (u(a[i]) < u(b[i])) { + return -1; + } + if (u(a[i]) > u(b[i])) { + return 1; + } + } + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProvider.java new file mode 100644 index 0000000..e0fd9c2 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProvider.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.address; + +import org.apache.reef.tang.ConfigurationProvider; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Injectable class that provides the local address of the node to bind to. + */ +@DefaultImplementation(HostnameBasedLocalAddressProvider.class) +public interface LocalAddressProvider extends ConfigurationProvider { + + /** + * @return a String representation of the local address. + */ + String getLocalAddress(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java new file mode 100644 index 0000000..e6f4a3c --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.address; + +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Uses Tang to create the default LocalAddressProvider. + * + * @deprecated Have an instance of LocalAddressProvider injected instead. + */ +@Deprecated +public final class LocalAddressProviderFactory { + private static final Logger LOGGER = Logger.getLogger(LocalAddressProviderFactory.class.getName()); + private static LocalAddressProvider instance = null; + + /** + * This class shall never be instantiated. + */ + private LocalAddressProviderFactory() { + // Intentionally left blank. + } + + /** + * @return the default LocalAddressProvider + * @deprecated Have an instance of LocalAddressProvider injected instead. + */ + public static LocalAddressProvider getInstance() { + if (null == instance) { + makeInstance(); + } + return instance; + } + + /** + * Makes the instance. + */ + private static void makeInstance() { + assert (null == instance); + try { + LOGGER.log(Level.FINER, "Instantiating default LocalAddressProvider for legacy users."); + instance = Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class); + LOGGER.log(Level.FINER, "Instantiated default LocalAddressProvider for legacy users."); + } catch (InjectionException e) { + throw new RuntimeException("Unable to instantiate default LocalAddressProvider for legacy users.", e); + } + assert (null != instance); + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/package-info.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/package-info.java new file mode 100644 index 0000000..79b2aa7 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Local address resolution. + */ +package org.apache.reef.wake.remote.address; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index fc6b804..ff1679c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,6 +23,8 @@ import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.StageManager; import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -58,6 +60,36 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator(); private RemoteIdentifier myIdentifier; + + /** + * @deprecated have an instance injected instead. + */ + @Deprecated + public <T> DefaultRemoteManagerImplementation( + final String name, + final String hostAddress, + final int listeningPort, + final Codec<T> codec, + final EventHandler<Throwable> errorHandler, + final boolean orderingGuarantee, + final int numberOfTries, + final int retryTimeout) { + this(name, + hostAddress, + listeningPort, + codec, + errorHandler, + orderingGuarantee, + numberOfTries, + retryTimeout, + LocalAddressProviderFactory.getInstance()); + + } + + /** + * @deprecated have an instance injected instead. + */ + @Deprecated @Inject public <T> DefaultRemoteManagerImplementation( final @Parameter(RemoteConfiguration.ManagerName.class) String name, @@ -67,7 +99,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { final @Parameter(RemoteConfiguration.ErrorHandler.class) EventHandler<Throwable> errorHandler, final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean orderingGuarantee, final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, - final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout) { + final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, + final LocalAddressProvider localAddressProvider) { this.name = name; this.handlerContainer = new HandlerContainer<>(name, codec); @@ -78,7 +111,7 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { if ("##UNKNOWN##".equals(hostAddress)) { this.transport = new NettyMessagingTransport( - NetUtils.getLocalAddress(), listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); + localAddressProvider.getLocalAddress(), listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); } else { this.transport = new NettyMessagingTransport( hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); @@ -93,10 +126,10 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { StageManager.instance().register(this); - LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}", + LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. Binding address provided by {5}", new Object[]{this.name, this.myIdentifier, counter.incrementAndGet(), this.transport.getLocalAddress().toString(), - this.transport.getListeningPort()} + this.transport.getListeningPort(), localAddressProvider} ); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java index db59b34..3defe01 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,13 +18,16 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.impl.TimerStage; -import org.apache.reef.wake.remote.NetUtils; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -35,7 +38,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.logging.Level; @@ -43,11 +45,16 @@ import java.util.logging.Level; * Test transferring large messages */ public class LargeMsgTest { + private final LocalAddressProvider localAddressProvider; private final static byte[][] values = new byte[3][]; private final static int l0 = 1 << 25; private final static int l1 = 1 << 2; private final static int l2 = 1 << 21; + public LargeMsgTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + @BeforeClass public static void setUpBeforeClass() { values[0] = new byte[l0]; @@ -78,7 +85,7 @@ public class LargeMsgTest { EStage<TransportEvent> serverStage = new ThreadPoolStage<>("server@7001", new ServerHandler(monitor, dataSize), 1, new LoggingEventHandler<Throwable>()); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); int port = 7001; NettyMessagingTransport transport = new NettyMessagingTransport(hostAddress, port, clientStage, serverStage, 1, 10000); final Link<byte[]> link = transport.open(new InetSocketAddress(hostAddress, port), new PassThroughEncoder(), null); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java index fa85651..c003cb5 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,14 +18,14 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.impl.DefaultIdentifierFactory; import org.apache.reef.wake.impl.LoggingEventHandler; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.NetUtils; -import org.apache.reef.wake.remote.RemoteIdentifier; -import org.apache.reef.wake.remote.RemoteManager; +import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; import org.junit.Assert; @@ -59,27 +59,25 @@ public class RemoteIdentifierFactoryTest { @Test public void testRemoteManagerIdentifier() throws Exception { - System.out.println(logPrefix + name.getMethodName()); + final Injector injector = Tang.Factory.getTang().newInjector(); + final LocalAddressProvider localAddressProvider = injector.getInstance(LocalAddressProvider.class); - int port = 9100; - Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); + final int port = 9100; + final Map<Class<?>, Codec<?>> clazzToCodecMap = new HashMap<Class<?>, Codec<?>>(); clazzToCodecMap.put(TestEvent.class, new TestEventCodec()); - Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - - String hostAddress = NetUtils.getLocalAddress(); + final Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - RemoteManager rm = new DefaultRemoteManagerImplementation("TestRemoteManager", - hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000); - RemoteIdentifier id = rm.getMyIdentifier(); - System.out.println(id.toString()); - IdentifierFactory factory = new DefaultIdentifierFactory(); - Identifier newid = factory.getNewInstance(id.toString()); - System.out.println(newid.toString()); + try (final RemoteManager rm = new DefaultRemoteManagerImplementation("TestRemoteManager", + localAddressProvider.getLocalAddress(), port, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, + localAddressProvider)) { + final RemoteIdentifier id = rm.getMyIdentifier(); - Assert.assertTrue(id.equals(newid)); + final IdentifierFactory factory = new DefaultIdentifierFactory(); + final Identifier newId = factory.getNewInstance(id.toString()); - rm.close(); + Assert.assertEquals(id, newId); + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index 7e98229..888574f 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,11 +18,14 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; @@ -47,6 +50,12 @@ import java.util.logging.Level; public class RemoteManagerTest { + private final LocalAddressProvider localAddressProvider; + + public RemoteManagerTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + @Rule public final TestName name = new TestName(); @@ -68,10 +77,10 @@ public class RemoteManagerTest { clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent2>()); Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -170,10 +179,10 @@ public class RemoteManagerTest { clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent2>()); Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, localAddressProvider); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -213,10 +222,10 @@ public class RemoteManagerTest { clazzToCodecMap.put(TestEvent.class, new TestEventCodec()); Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = NetUtils.getLocalAddress(); + String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -250,12 +259,12 @@ public class RemoteManagerTest { clazzToCodecMap.put(TestEvent.class, new ObjectSerializableCodec<TestEvent>()); Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); ExceptionHandler errorHandler = new ExceptionHandler(monitor); try (final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, errorHandler, false, 3, 10000)) { + "name", hostAddress, port, codec, errorHandler, false, 3, 10000, localAddressProvider)) { RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -281,9 +290,9 @@ public class RemoteManagerTest { clazzToCodecMap.put(TestEvent2.class, new ObjectSerializableCodec<TestEvent1>()); Codec<?> codec = new MultiCodec<Object>(clazzToCodecMap); - String hostAddress = NetUtils.getLocalAddress(); + String hostAddress = localAddressProvider.getLocalAddress(); return new DefaultRemoteManagerImplementation(name, hostAddress, localPort, - codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout); + codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, localAddressProvider); } private class SendingRemoteManagerThread implements Callable<Integer> { @@ -304,7 +313,7 @@ public class RemoteManagerTest { Monitor monitor = new Monitor(); TimerStage timer = new TimerStage(new TimeoutHandler(monitor), timeout, timeout); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = localAddressProvider.getLocalAddress(); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java index 2e06b81..636315f 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,6 +24,8 @@ import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.MultiEventHandler; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.*; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -41,12 +43,17 @@ import java.util.*; import java.util.logging.Level; public class RemoteTest { - + private final LocalAddressProvider localAddressProvider; @Rule public final TestName name = new TestName(); final String logPrefix = "TEST "; + + public RemoteTest() { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + @Test public void testRemoteEventCodec() throws UnknownHostException { System.out.println(logPrefix + name.getMethodName()); @@ -54,8 +61,8 @@ public class RemoteTest { ObjectSerializableCodec<TestEvent> codec = new ObjectSerializableCodec<TestEvent>(); RemoteEventCodec<TestEvent> reCodec = new RemoteEventCodec<TestEvent>(codec); - SocketAddress localAddr = new InetSocketAddress(NetUtils.getLocalAddress(), 8000); - SocketAddress remoteAddr = new InetSocketAddress(NetUtils.getLocalAddress(), 9000); + SocketAddress localAddr = new InetSocketAddress(this.localAddressProvider.getLocalAddress(), 8000); + SocketAddress remoteAddr = new InetSocketAddress(this.localAddressProvider.getLocalAddress(), 9000); RemoteEvent<TestEvent> e1 = new RemoteEvent<TestEvent>( localAddr, remoteAddr, "stage1", "stage2", 1, new TestEvent("hello", 0.0)); @@ -77,7 +84,7 @@ public class RemoteTest { // receiver stage final RemoteReceiverStage reRecvStage = new RemoteReceiverStage(null, null, 10); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport Transport transport1 = new NettyMessagingTransport(hostAddress, 0, reRecvStage, reRecvStage, 1, 10000); @@ -122,7 +129,7 @@ public class RemoteTest { final RemoteReceiverStage reRecvStage = new RemoteReceiverStage( new RemoteEventHandler(decoder, handler), new LoggingEventHandler<Throwable>(), 10); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java index 42050d7..fdced04 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/SmallMessagesTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,12 @@ import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.MultiEventHandler; import org.apache.reef.wake.impl.TimerStage; -import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.Decoder; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.RemoteIdentifier; +import org.apache.reef.wake.remote.RemoteIdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.*; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -37,6 +42,11 @@ import java.util.*; import java.util.logging.Level; public class SmallMessagesTest { + private final LocalAddressProvider localAddressProvider; + + public SmallMessagesTest() { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } @Rule public final TestName name = new TestName(); @@ -74,7 +84,7 @@ public class SmallMessagesTest { final RemoteReceiverStage reRecvStage = new RemoteReceiverStage( new RemoteEventHandler(decoder, handler), null, 10); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); // transport Transport transport = new NettyMessagingTransport(hostAddress, port, reRecvStage, reRecvStage, 1, 10000); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java index 1ec61ce..e12a517 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,23 +18,35 @@ */ package org.apache.reef.wake.test.remote; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.remote.*; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; +import javax.inject.Inject; import java.net.UnknownHostException; -public class TestRemote { +public class TestRemote implements Runnable { + private final LocalAddressProvider localAddressProvider; - public static void main(String[] args) { - String hostAddress = NetUtils.getLocalAddress(); + @Inject + public TestRemote() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + + @Override + public void run() { + final String hostAddress = localAddressProvider.getLocalAddress(); int myPort = 10011; int remotePort = 10001; Codec<TestEvent> codec = new TestEventCodec(); try (RemoteManager rm = new DefaultRemoteManagerImplementation("name", hostAddress, - myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000)) { + myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, localAddressProvider)) { // proxy handler RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort); @@ -50,6 +62,10 @@ public class TestRemote { e.printStackTrace(); } } + + public static void main(String[] args) throws InjectionException { + Tang.Factory.getTang().newInjector().getInstance(TestRemote.class).run(); + } } class TestEventHandler implements EventHandler<RemoteMessage<TestEvent>> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java index 7b7cfde..bdb480d 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportRaceTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,7 +24,8 @@ import org.apache.reef.wake.impl.LoggingEventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.impl.TimerStage; -import org.apache.reef.wake.remote.NetUtils; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -39,6 +40,11 @@ import java.util.logging.Level; public class TransportRaceTest { + private final LocalAddressProvider localAddressProvider; + + public TransportRaceTest() { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } @Test public void testRace() throws Exception { @@ -52,7 +58,7 @@ public class TransportRaceTest { final ServerHandler serverHandler = new ServerHandler(monitor, msgsSent); EStage<TransportEvent> serverStage = new ThreadPoolStage<>("server@7001", serverHandler, 1, new LoggingEventHandler<Throwable>()); - String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); int port = 7001; NettyMessagingTransport transport = new NettyMessagingTransport( hostAddress, port, clientStage, serverStage, 1, 10000); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/a72a40f8/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java index 9c1d045..8176a70 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TransportTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,7 +22,8 @@ import org.apache.reef.wake.EStage; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.TimerStage; import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.NetUtils; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.Link; @@ -42,6 +43,11 @@ import java.util.logging.Level; public class TransportTest { + private final LocalAddressProvider localAddressProvider; + + public TransportTest() { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } final String logPrefix = "TEST "; @Rule @@ -56,7 +62,8 @@ public class TransportTest { TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); final int expected = 2; - final String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); + ; final int port = 9100; // Codec<String> @@ -87,7 +94,7 @@ public class TransportTest { TimerStage timer = new TimerStage(new TimeoutHandler(monitor), 2000, 2000); final int expected = 2; - final String hostAddress = NetUtils.getLocalAddress(); + final String hostAddress = this.localAddressProvider.getLocalAddress(); final int port = 9100; // Codec<TestEvent>
