Repository: kudu Updated Branches: refs/heads/master 1a8ce4269 -> ff24651cb
[java client] Identify client-local TabletClients This patch adds building blocks to enable selecting the closest replica. Another patch will be required to finish the plumbing job. Inspired by Hadoop: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java#L698 Change-Id: Ia1bfcbc6b6aea7cb9610e8ae934bf08ab0774ee3 Reviewed-on: http://gerrit.cloudera.org:8080/4786 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ff24651c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ff24651c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ff24651c Branch: refs/heads/master Commit: ff24651cb9cb7f78ee2806b8624e61fed132b516 Parents: 1a8ce42 Author: Jean-Daniel Cryans <[email protected]> Authored: Fri Oct 21 15:30:39 2016 -0700 Committer: Jean-Daniel Cryans <[email protected]> Committed: Mon Oct 24 23:16:54 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 7 +-- .../org/apache/kudu/client/ConnectionCache.java | 52 ++++++------------- .../org/apache/kudu/client/TabletClient.java | 13 ++++- .../main/java/org/apache/kudu/util/NetUtil.java | 54 ++++++++++++++++++++ .../apache/kudu/client/TestConnectionCache.java | 2 +- .../java/org/apache/kudu/util/TestNetUtil.java | 12 ++++- 6 files changed, 97 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index f455c0e..a781f9f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; +import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -1405,8 +1406,8 @@ public class AsyncKuduClient implements AutoCloseable { * @return A live and initialized client for the specified master server. */ TabletClient newMasterClient(HostAndPort masterHostPort) { - String ip = ConnectionCache.getIP(masterHostPort.getHostText()); - if (ip == null) { + InetAddress inetAddress = NetUtil.getInetAddress((masterHostPort.getHostText())); + if (inetAddress == null) { return null; } // We should pass a UUID here but we have a chicken and egg problem, we first need to @@ -1415,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable { // host and port which is enough to identify the node we're connecting to. return connectionCache.newClient( MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(), - ip, masterHostPort.getPort()); + inetAddress, masterHostPort.getPort()); } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index 9a885fb..ed6100e 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -23,6 +23,7 @@ import org.apache.kudu.Common; import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.annotations.InterfaceStability; import org.apache.kudu.master.Master; +import org.apache.kudu.util.NetUtil; import org.jboss.netty.channel.DefaultChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannelConfig; @@ -104,15 +105,21 @@ class ConnectionCache { // from meta_cache.cc // TODO: if the TS advertises multiple host/ports, pick the right one // based on some kind of policy. For now just use the first always. - String ip = getIP(addresses.get(0).getHost()); - if (ip == null) { + InetAddress inetAddress = NetUtil.getInetAddress(addresses.get(0).getHost()); + if (inetAddress == null) { throw new UnknownHostException( "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'"); } - newClient(uuid, ip, addresses.get(0).getPort()); + newClient(uuid, inetAddress, addresses.get(0).getPort()); } - TabletClient newClient(String uuid, final String host, final int port) { + TabletClient newClient(String uuid, InetAddress inetAddress, int port) { + String host = inetAddress.getHostAddress(); + boolean isLocal = NetUtil.isLocalAddress(inetAddress); + return newClient(uuid, host, port, isLocal); + } + + TabletClient newClient(String uuid, String host, int port, boolean isLocal) { TabletClient client; SocketChannel chan; @@ -123,7 +130,7 @@ class ConnectionCache { return client; } final TabletClientPipeline pipeline = new TabletClientPipeline(); - client = pipeline.init(uuid, host, port); + client = pipeline.init(uuid, host, port, isLocal); chan = this.kuduClient.getChannelFactory().newChannel(pipeline); uuid2client.put(uuid, client); } finally { @@ -159,7 +166,7 @@ class ConnectionCache { /** * Get a connection to a server for the given UUID. This method will automatically call - * {@link #newClient(String, String, int)} if the cached connection is down. + * {@link #newClient(String, InetAddress, int)} if the cached connection is down. * @param uuid server's identifier * @return a connection to a server, or null if the passed UUID isn't known */ @@ -171,7 +178,7 @@ class ConnectionCache { } else if (client.isAlive()) { return client; } else { - return newClient(uuid, client.getHost(), client.getPort()); + return newClient(uuid, client.getHost(), client.getPort(), client.isLocal()); } } @@ -226,38 +233,11 @@ class ConnectionCache { return true; } - /** - * Gets a hostname or an IP address and returns the textual representation - * of the IP address. - * <p> - * <strong>This method can block</strong> as there is no API for - * asynchronous DNS resolution in the JDK. - * @param host the hostname to resolve - * @return the IP address associated with the given hostname, - * or {@code null} if the address couldn't be resolved - */ - static String getIP(final String host) { - final long start = System.nanoTime(); - try { - final String ip = InetAddress.getByName(host).getHostAddress(); - final long latency = System.nanoTime() - start; - if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) { - LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency); - } else if (latency >= 3000000/*ns*/) { - LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency); - } - return ip; - } catch (UnknownHostException e) { - LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start)); - return null; - } - } - private final class TabletClientPipeline extends DefaultChannelPipeline { - TabletClient init(String uuid, String host, int port) { + TabletClient init(String uuid, String host, int port, boolean isLocal) { AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; - final TabletClient client = new TabletClient(kuduClient, uuid, host, port); + final TabletClient client = new TabletClient(kuduClient, uuid, host, port, isLocal); if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { super.addLast("timeout-handler", new ReadTimeoutHandler(kuduClient.getTimer(), http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java index f5bb44c..0b190ed 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java @@ -149,13 +149,16 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { // differently by also clearing the caches. private volatile boolean gotUncaughtException = false; - public TabletClient(AsyncKuduClient client, String uuid, String host, int port) { + private final boolean local; + + public TabletClient(AsyncKuduClient client, String uuid, String host, int port, boolean local) { this.kuduClient = client; this.uuid = uuid; this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs(); this.host = host; this.port = port; this.requestTracker = client.getRequestTracker(); + this.local = local; } <R> void sendRpc(KuduRpc<R> rpc) { @@ -841,6 +844,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { } /** + * Returns if this server is on this client's host. + * @return true if the server is local, else false + */ + boolean isLocal() { + return local; + } + + /** * Returns this tablet server's port. * @return a port number that this tablet server is bound to */ http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java index 589cb8f..4ca6d8c 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java @@ -23,7 +23,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; import org.apache.kudu.annotations.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.net.UnknownHostException; import java.util.List; /** @@ -32,6 +38,8 @@ import java.util.List; @InterfaceAudience.Private public class NetUtil { + private static final Logger LOG = LoggerFactory.getLogger(NetUtil.class); + /** * Convert a list of {@link HostAndPort} objects to a comma separate string. * The inverse of {@link #parseStrings(String, int)}. @@ -75,4 +83,50 @@ public class NetUtil { } return hostsAndPorts; } + + /** + * Gets a hostname or an IP address and returns an InetAddress. + * <p> + * <strong>This method can block</strong> as there is no API for + * asynchronous DNS resolution in the JDK. + * @param host the hostname to resolve + * @return an InetAddress for the given hostname, + * or {@code null} if the address couldn't be resolved + */ + public static InetAddress getInetAddress(final String host) { + final long start = System.nanoTime(); + try { + InetAddress ip = InetAddress.getByName(host); + long latency = System.nanoTime() - start; + if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) { + LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency); + } else if (latency >= 3000000/*ns*/) { + LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency); + } + return ip; + } catch (UnknownHostException e) { + LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start)); + return null; + } + } + + /** + * Given an InetAddress, checks to see if the address is a local address, by + * comparing the address with all the interfaces on the node. + * @param addr address to check if it is local node's address + * @return true if the address corresponds to the local node + */ + public static boolean isLocalAddress(InetAddress addr) { + // Check if the address is any local or loopback. + boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress(); + + // Check if the address is defined on any interface. + if (!local) { + try { + local = NetworkInterface.getByInetAddress(addr) != null; + } catch (SocketException e) { + } + } + return local; + } } http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java index 3776d50..e238069 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java @@ -39,7 +39,7 @@ public class TestConnectionCache { ConnectionCache cache = new ConnectionCache(client); int i = 0; for (HostAndPort hp : addresses) { - TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort()); + TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort(), false); // Ping the process so we go through the whole connection process. pingConnection(conn); i++; http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java index cc88a3f..aa9f4b1 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java @@ -19,11 +19,11 @@ package org.apache.kudu.util; import com.google.common.net.HostAndPort; import org.junit.Test; +import java.net.InetAddress; import java.util.Arrays; import java.util.List; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; /** * Test for {@link NetUtil}. @@ -70,4 +70,12 @@ public class TestNetUtil { ); assertEquals(NetUtil.hostsAndPortsToString(hostsAndPorts), "127.0.0.1:1111,1.2.3.4.5:0"); } + + @Test + public void testLocal() throws Exception { + assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("localhost"))); + assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("127.0.0.1"))); + assertTrue(NetUtil.isLocalAddress(InetAddress.getLocalHost())); + assertFalse(NetUtil.isLocalAddress(NetUtil.getInetAddress("kudu.apache.org"))); + } }
