[FLINK-4418] [client] Improve resilience when InetAddress.getLocalHost() throws UnknownHostException
- If InetAddress.getLocalHost() throws UnknownHostException when attempting to connect with LOCAL_HOST strategy, the code will move on to try the other strategies instead of immediately failing. - Also made minor code style improvements for trying the different strategies. This closes #2383 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f5a8cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f5a8cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f5a8cc Branch: refs/heads/master Commit: 53f5a8cc572afca617eca5ae128bb42073e9fcb2 Parents: 6a456c6 Author: Shannon Carey <[email protected]> Authored: Wed Aug 17 19:35:49 2016 -0500 Committer: Stephan Ewen <[email protected]> Committed: Fri Aug 26 17:53:19 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/net/ConnectionUtils.java | 46 ++++++++--------- .../flink/runtime/net/ConnectionUtilsTest.java | 53 ++++++++++++++++++-- 2 files changed, 71 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java index 77324fa..dcf5a62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java @@ -24,7 +24,11 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.Socket; import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -37,6 +41,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; + /** * Utilities to determine the network interface and address that should be used to bind the * TaskManager communication to. @@ -110,40 +115,27 @@ public class ConnectionUtils { long currentSleepTime = MIN_SLEEP_TIME; long elapsedTime = 0; + final List<AddressDetectionState> strategies = Collections.unmodifiableList( + Arrays.asList( + AddressDetectionState.LOCAL_HOST, + AddressDetectionState.ADDRESS, + AddressDetectionState.FAST_CONNECT, + AddressDetectionState.SLOW_CONNECT)); + // loop while there is time left while (elapsedTime < maxWaitMillis) { - AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST; - boolean logging = elapsedTime >= startLoggingAfter; if (logging) { LOG.info("Trying to connect to " + targetAddress); } - // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT - do { + + // Try each strategy in order + for (AddressDetectionState strategy : strategies) { InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging); if (address != null) { return address; } - - // pick the next strategy - switch (strategy) { - case LOCAL_HOST: - strategy = AddressDetectionState.ADDRESS; - break; - case ADDRESS: - strategy = AddressDetectionState.FAST_CONNECT; - break; - case FAST_CONNECT: - strategy = AddressDetectionState.SLOW_CONNECT; - break; - case SLOW_CONNECT: - strategy = null; - break; - default: - throw new RuntimeException("Unsupported strategy: " + strategy); - } } - while (strategy != null); // we have made a pass with all strategies over all interfaces // sleep for a while before we make the next pass @@ -229,7 +221,13 @@ public class ConnectionUtils { { // try LOCAL_HOST strategy independent of the network interfaces if (strategy == AddressDetectionState.LOCAL_HOST) { - InetAddress localhostName = InetAddress.getLocalHost(); + InetAddress localhostName; + try { + localhostName = InetAddress.getLocalHost(); + } catch (UnknownHostException uhe) { + LOG.warn("Could not resolve local hostname to an IP address: {}", uhe.getMessage()); + return null; + } if (tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) { LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address"); http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java index 570f87c..13a8214 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java @@ -17,17 +17,30 @@ */ package org.apache.flink.runtime.net; -import static org.junit.Assert.*; - -import org.junit.Test; - +import java.io.IOException; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.net.UnknownHostException; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for the network utilities. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(ConnectionUtils.class) public class ConnectionUtilsTest { @Test @@ -55,4 +68,36 @@ public class ConnectionUtilsTest { fail(e.getMessage()); } } + + @Test + public void testFindConnectingAddressWhenGetLocalHostThrows() throws Exception { + PowerMockito.mockStatic(InetAddress.class); + Mockito.when(InetAddress.getLocalHost()).thenThrow(new UnknownHostException()).thenCallRealMethod(); + + final InetAddress loopbackAddress = Inet4Address.getByName("127.0.0.1"); + Thread socketServerThread; + try (ServerSocket socket = new ServerSocket(0, 1, loopbackAddress)) { + // Make sure that the thread will eventually die even if something else goes wrong + socket.setSoTimeout(10_000); + socketServerThread = new Thread(new Runnable() { + @Override + public void run() { + try { + socket.accept(); + } catch (IOException e) { + // ignore + } + } + }); + socketServerThread.start(); + + final InetSocketAddress socketAddress = new InetSocketAddress(loopbackAddress, socket.getLocalPort()); + final InetAddress address = ConnectionUtils.findConnectingAddress( + socketAddress, 2000, 400); + + PowerMockito.verifyStatic(); + // Make sure we got an address via alternative means + assertNotNull(address); + } + } }
