[java client] Refactor all server info into a single class, add locality Having this helps propagate information around the client. RemoteTablet can now answer requests for local servers.
This patch also tries to simplify how RemoteTablets are created, as it's getting a bit messy and redundant with the passing of ServerInfos. Change-Id: I33984a437d8c8d07d5db4d16f8da723b3e904189 Reviewed-on: http://gerrit.cloudera.org:8080/4836 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a326fc95 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a326fc95 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a326fc95 Branch: refs/heads/master Commit: a326fc950681c7187c096b5899186932e6e8d080 Parents: 96c2e2a Author: Jean-Daniel Cryans <[email protected]> Authored: Mon Oct 24 10:17:36 2016 -0700 Committer: Jean-Daniel Cryans <[email protected]> Committed: Thu Oct 27 18:18:13 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 19 ++--- .../org/apache/kudu/client/ConnectionCache.java | 27 ++++--- .../org/apache/kudu/client/RemoteTablet.java | 54 +++++++++----- .../java/org/apache/kudu/client/ServerInfo.java | 78 ++++++++++++++++++++ .../org/apache/kudu/client/TabletClient.java | 56 +++----------- .../apache/kudu/client/TestConnectionCache.java | 3 +- .../apache/kudu/client/TestRemoteTablet.java | 29 +++++++- 7 files changed, 178 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index a781f9f..b36737b 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -1137,7 +1137,7 @@ public class AsyncKuduClient implements AutoCloseable { * a RPC, so we need to demote it and retry. */ <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) { - rpc.getTablet().demoteLeader(server.getUuid()); + rpc.getTablet().demoteLeader(server.getServerInfo().getUuid()); handleRetryableError(rpc, ex); } @@ -1182,9 +1182,9 @@ public class AsyncKuduClient implements AutoCloseable { * the tablet itself from the caches. */ private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) { - LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " + - tablet.getTabletId()); - tablet.removeTabletClient(server.getUuid()); + String uuid = server.getServerInfo().getUuid(); + LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId()); + tablet.removeTabletClient(uuid); } /** Callback executed when a master lookup completes. */ @@ -1275,12 +1275,14 @@ public class AsyncKuduClient implements AutoCloseable { List<RemoteTablet> tablets = new ArrayList<>(locations.size()); for (Master.TabletLocationsPB tabletPb : locations) { - String tabletId = tabletPb.getTabletId().toStringUtf8(); - List<UnknownHostException> lookupExceptions = new ArrayList<>(tabletPb.getReplicasCount()); + List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount()); for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) { try { - connectionCache.connectTS(replica.getTsInfo()); + ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo()); + if (serverInfo != null) { + servers.add(serverInfo); + } } catch (UnknownHostException ex) { lookupExceptions.add(ex); } @@ -1293,8 +1295,7 @@ public class AsyncKuduClient implements AutoCloseable { throw new NonRecoverableException(statusIOE); } - Partition partition = ProtobufHelper.pbToPartition(tabletPb.getPartition()); - RemoteTablet rt = new RemoteTablet(tableId, tabletId, partition, tabletPb); + RemoteTablet rt = new RemoteTablet(tableId, tabletPb, servers); LOG.info("Learned about tablet {} for table '{}' with partition {}", rt.getTabletId(), tableName, rt.getPartition()); http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java index ed6100e..9d8f1d0 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java @@ -92,14 +92,15 @@ class ConnectionCache { /** * Create a connection to a tablet server based on information provided by the master. * @param tsInfoPB master-provided information for the tablet server + * @return an object that contains all the server's information * @throws UnknownHostException if we cannot resolve the tablet server's IP address */ - void connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException { + ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException { List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList(); String uuid = tsInfoPB.getPermanentUuid().toStringUtf8(); if (addresses.isEmpty()) { LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid); - return; + return null; } // from meta_cache.cc @@ -110,29 +111,30 @@ class ConnectionCache { throw new UnknownHostException( "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'"); } - newClient(uuid, inetAddress, addresses.get(0).getPort()); + return newClient(uuid, inetAddress, addresses.get(0).getPort()).getServerInfo(); } TabletClient newClient(String uuid, InetAddress inetAddress, int port) { String host = inetAddress.getHostAddress(); boolean isLocal = NetUtil.isLocalAddress(inetAddress); - return newClient(uuid, host, port, isLocal); + ServerInfo serverInfo = new ServerInfo(uuid, host, port, isLocal); + return newClient(serverInfo); } - TabletClient newClient(String uuid, String host, int port, boolean isLocal) { + TabletClient newClient(ServerInfo serverInfo) { TabletClient client; SocketChannel chan; writeLock.lock(); try { - client = uuid2client.get(uuid); + client = uuid2client.get(serverInfo.getUuid()); if (client != null && client.isAlive()) { return client; } final TabletClientPipeline pipeline = new TabletClientPipeline(); - client = pipeline.init(uuid, host, port, isLocal); + client = pipeline.init(serverInfo); chan = this.kuduClient.getChannelFactory().newChannel(pipeline); - uuid2client.put(uuid, client); + uuid2client.put(serverInfo.getUuid(), client); } finally { writeLock.unlock(); } @@ -144,7 +146,8 @@ class ConnectionCache { // Java since the JRE doesn't expose any way to call setsockopt() with // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. config.setKeepAlive(true); - chan.connect(new InetSocketAddress(host, port)); // Won't block. + chan.connect( + new InetSocketAddress(serverInfo.getHostname(), serverInfo.getPort())); // Won't block. return client; } @@ -178,7 +181,7 @@ class ConnectionCache { } else if (client.isAlive()) { return client; } else { - return newClient(uuid, client.getHost(), client.getPort(), client.isLocal()); + return newClient(client.getServerInfo()); } } @@ -235,9 +238,9 @@ class ConnectionCache { private final class TabletClientPipeline extends DefaultChannelPipeline { - TabletClient init(String uuid, String host, int port, boolean isLocal) { + TabletClient init(ServerInfo serverInfo) { AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient; - final TabletClient client = new TabletClient(kuduClient, uuid, host, port, isLocal); + final TabletClient client = new TabletClient(kuduClient, serverInfo); if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) { super.addLast("timeout-handler", new ReadTimeoutHandler(kuduClient.getTimer(), http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java index 07936c8..28be0fe 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java @@ -27,16 +27,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; -import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** * This class encapsulates the information regarding a tablet and its locations. * <p> - * RemoteTablet's main function, once it is init()'d, is to keep track of where the leader for this + * RemoteTablet's main function is to keep track of where the leader for this * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find * it's not the leader anymore, and then call {@link #demoteLeader(String)}. * <p> @@ -53,7 +52,7 @@ class RemoteTablet implements Comparable<RemoteTablet> { private final String tableId; private final String tabletId; @GuardedBy("tabletServers") - private final Set<String> tabletServers = new HashSet<>(); + private final Map<String, ServerInfo> tabletServers; private final AtomicReference<List<LocatedTablet.Replica>> replicas = new AtomicReference(ImmutableList.of()); private final Partition partition; @@ -61,15 +60,22 @@ class RemoteTablet implements Comparable<RemoteTablet> { @GuardedBy("tabletServers") private String leaderUuid; - RemoteTablet(String tableId, String tabletId, - Partition partition, Master.TabletLocationsPB tabletLocations) { - this.tabletId = tabletId; + RemoteTablet(String tableId, + Master.TabletLocationsPB tabletLocations, + List<ServerInfo> serverInfos) { + this.tabletId = tabletLocations.getTabletId().toStringUtf8(); this.tableId = tableId; - this.partition = partition; + this.partition = ProtobufHelper.pbToPartition(tabletLocations.getPartition()); + this.tabletServers = new HashMap<>(serverInfos.size()); + for (ServerInfo serverInfo : serverInfos) { + this.tabletServers.put(serverInfo.getUuid(), serverInfo); + } + + ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>(); for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) { String uuid = replica.getTsInfo().getPermanentUuid().toStringUtf8(); - tabletServers.add(uuid); + replicasBuilder.add(new LocatedTablet.Replica(replica)); if (replica.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) { leaderUuid = uuid; } @@ -78,11 +84,6 @@ class RemoteTablet implements Comparable<RemoteTablet> { if (leaderUuid == null) { LOG.warn("No leader provided for tablet {}", getTabletId()); } - - ImmutableList.Builder<LocatedTablet.Replica> replicasBuilder = new ImmutableList.Builder<>(); - for (Master.TabletLocationsPB.ReplicaPB replica : tabletLocations.getReplicasList()) { - replicasBuilder.add(new LocatedTablet.Replica(replica)); - } replicas.set(replicasBuilder.build()); } @@ -101,7 +102,7 @@ class RemoteTablet implements Comparable<RemoteTablet> { if (leaderUuid != null && leaderUuid.equals(uuid)) { leaderUuid = null; } - if (tabletServers.remove(uuid)) { + if (tabletServers.remove(uuid) != null) { return true; } LOG.debug("tablet {} already removed ts {}, size left is {}", @@ -135,7 +136,7 @@ class RemoteTablet implements Comparable<RemoteTablet> { } /** - * Get the UUID of the tablet server that we think holds the leader replica for this tablet. + * Gets the UUID of the tablet server that we think holds the leader replica for this tablet. * @return a UUID of a tablet server that we think has the leader, else null */ String getLeaderUUID() { @@ -145,6 +146,25 @@ class RemoteTablet implements Comparable<RemoteTablet> { } /** + * Gets the UUID of the closest server. If none is closer than the others, returns a random + * server UUID. + * @return the UUID of the closest server, which might be any if none is closer, or null if this + * cache doesn't know of any servers + */ + String getClosestUUID() { + synchronized (tabletServers) { + String lastUuid = null; + for (ServerInfo serverInfo : tabletServers.values()) { + lastUuid = serverInfo.getUuid(); + if (serverInfo.isLocal()) { + return serverInfo.getUuid(); + } + } + return lastUuid; + } + } + + /** * Gets the replicas of this tablet. The returned list may not be mutated. * @return the replicas of the tablet */ http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java new file mode 100644 index 0000000..999f7f2 --- /dev/null +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.kudu.client; + +import org.apache.kudu.annotations.InterfaceAudience; + +/** + * Container class for server information that never changes, like UUID and hostname. + */ [email protected] +public class ServerInfo { + private final String uuid; + private final String hostname; + private final int port; + private final boolean local; + + /** + * Constructor for all the fields. The intent is that there should only be one ServerInfo + * instance per UUID the client is connected to. + * @param uuid server's UUID + * @param hostname server's hostname, only one of them + * @param port server's port + * @param local if the server is hosted on the same machine where this client is running + */ + public ServerInfo(String uuid, String hostname, int port, boolean local) { + this.uuid = uuid; + this.hostname = hostname; + this.port = port; + this.local = local; + } + + /** + * Returns this server's uuid. + * @return a string that contains this server's uuid + */ + public String getUuid() { + return uuid; + } + + /** + * Returns this server's hostname. We might get many hostnames from the master for a single + * TS, and this is the one we picked to connect to originally. + * @return a string that contains this server's hostname + */ + public String getHostname() { + return hostname; + } + + /** + * Returns this server's port. + * @return a port number that this server is bound to + */ + public int getPort() { + return port; + } + + /** + * Returns if this server is on this client's host. + * @return true if the server is local, else false + */ + public boolean isLocal() { + return local; + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java index 0b190ed..822fac6 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java @@ -133,32 +133,23 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { private final AsyncKuduClient kuduClient; - private final String uuid; - - private final String host; - - private final int port; - private final long socketReadTimeoutMs; private SecureRpcHelper secureRpcHelper; private final RequestTracker requestTracker; + private final ServerInfo serverInfo; + // If an uncaught exception forced us to shutdown this TabletClient, we'll handle the retry // differently by also clearing the caches. private volatile boolean gotUncaughtException = false; - private final boolean local; - - public TabletClient(AsyncKuduClient client, String uuid, String host, int port, boolean local) { + public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) { this.kuduClient = client; - this.uuid = uuid; this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs(); - this.host = host; - this.port = port; this.requestTracker = client.getRequestTracker(); - this.local = local; + this.serverInfo = serverInfo; } <R> void sendRpc(KuduRpc<R> rpc) { @@ -449,7 +440,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { } } else { try { - decoded = rpc.deserialize(response, this.uuid); + decoded = rpc.deserialize(response, this.serverInfo.getUuid()); } catch (Exception ex) { exception = ex; } @@ -832,40 +823,11 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { } private String getPeerUuidLoggingString() { - return "[Peer " + uuid + "] "; + return "[Peer " + serverInfo.getUuid() + "] "; } - /** - * Returns this tablet server's uuid. - * @return a string that contains this tablet server's uuid - */ - String getUuid() { - return uuid; - } - - /** - * Returns if this server is on this client's host. - * @return true if the server is local, else false - */ - boolean isLocal() { - return local; - } - - /** - * Returns this tablet server's port. - * @return a port number that this tablet server is bound to - */ - int getPort() { - return port; - } - - /** - * Returns this tablet server's hostname. We might get many hostnames from the master for a single - * TS, and this is the one we picked to connect to originally. - * @returna string that contains this tablet server's hostname - */ - String getHost() { - return host; + ServerInfo getServerInfo() { + return serverInfo; } public String toString() { @@ -875,7 +837,7 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> { .append("(chan=") // = 6 .append(chan) // ~64 (up to 66 when using IPv4) .append(", uuid=") // = 7 - .append(uuid) // = 32 + .append(serverInfo.getUuid()) // = 32 .append(", #pending_rpcs="); // =16 int npending_rpcs; synchronized (this) { http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java index e238069..334dd38 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java @@ -39,7 +39,8 @@ public class TestConnectionCache { ConnectionCache cache = new ConnectionCache(client); int i = 0; for (HostAndPort hp : addresses) { - TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort(), false); + TabletClient conn = + cache.newClient(new ServerInfo(i + "", hp.getHostText(), hp.getPort(), false)); // Ping the process so we go through the whole connection process. pingConnection(conn); i++; http://git-wip-us.apache.org/repos/asf/kudu/blob/a326fc95/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java index 5cf4de8..aa50ad6 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java @@ -21,6 +21,9 @@ import org.apache.kudu.consensus.Metadata; import org.apache.kudu.master.Master; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.*; public class TestRemoteTablet { @@ -83,17 +86,39 @@ public class TestRemoteTablet { assertTrue(tablet.removeTabletClient("2")); } + @Test + public void testLocalReplica() { + RemoteTablet tablet = getTablet(0, 0); + + assertEquals("0", tablet.getClosestUUID()); + } + + @Test + public void testNoLocalReplica() { + RemoteTablet tablet = getTablet(0, -1); + + // We just care about getting one back. + assertNotNull(tablet.getClosestUUID()); + } + private RemoteTablet getTablet(int leaderIndex) { + return getTablet(leaderIndex, -1); + } + + private RemoteTablet getTablet(int leaderIndex, int localReplicaIndex) { Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder(); tabletPb.setPartition(TestUtils.getFakePartitionPB()); tabletPb.setTabletId(ByteString.copyFromUtf8("fake tablet")); + List<ServerInfo> servers = new ArrayList<>(); for (int i = 0; i < 3; i++) { + String uuid = i + ""; + servers.add(new ServerInfo(uuid, "host", i, i == localReplicaIndex)); tabletPb.addReplicas(TestUtils.getFakeTabletReplicaPB( - i + "", "host", i, + uuid, "host", i, leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER)); } - return new RemoteTablet("fake table", "fake tablet", null, tabletPb.build()); + return new RemoteTablet("fake table", tabletPb.build(), servers); } }
