[java client] Cleanup AsyncKuduClient's unused caches Originally, asynchbase came with a few caches such as server-connection->region and region->server-connection. Via dozens of patches, we've reduced their usefulness to 0. This patch simple finishes the job and removes them.
FWIW client2tablets was always a source of pain, and the caching logic is now a lot easier to manage, and potentially refactor! Change-Id: I62802c34c618c83a4ff69d79825387cbe4ab51a8 Reviewed-on: http://gerrit.cloudera.org:8080/4705 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1746ae02 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1746ae02 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1746ae02 Branch: refs/heads/master Commit: 1746ae0210ef1340346edae7e154ab5254fccda8 Parents: fb9526f Author: Jean-Daniel Cryans <[email protected]> Authored: Wed Oct 12 16:07:57 2016 -0700 Committer: Jean-Daniel Cryans <[email protected]> Committed: Thu Oct 13 23:11:34 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/AsyncKuduClient.java | 98 ++++---------------- .../kudu/client/TestAsyncKuduSession.java | 4 +- 2 files changed, 22 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index 84e60de..cbb128e 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -142,43 +142,18 @@ public class AsyncKuduClient implements AutoCloseable { private final ClientSocketChannelFactory channelFactory; /** - * This map and the next 2 maps contain data cached from calls to the master's - * GetTableLocations RPC. There is no consistency guarantee across the maps. - * They are not updated all at the same time atomically. - * - * {@code tableLocations} is always the first to be updated because it's the - * map from which all the lookups are done in the fast-path of the requests - * that need to locate a tablet. {@code tablet2client} is updated second, - * because it comes second in the fast-path of every requests that need to - * locate a tablet. {@code client2tablets} is only used to handle TabletServer - * disconnections gracefully. - * - * This map is keyed by table ID. + * This map contains data cached from calls to the master's + * GetTableLocations RPC. This map is keyed by table ID. */ private final ConcurrentHashMap<String, TableLocationsCache> tableLocations = new ConcurrentHashMap<>(); /** - * Maps a tablet ID to the RemoteTablet that knows where all the replicas are served. - */ - private final ConcurrentHashMap<Slice, RemoteTablet> tablet2client = new ConcurrentHashMap<>(); - - /** - * Maps a client connected to a TabletServer to the list of tablets we know - * it's serving so far. - */ - @VisibleForTesting - final ConcurrentHashMap<TabletClient, ArrayList<RemoteTablet>> client2tablets = - new ConcurrentHashMap<>(); - - /** * Cache that maps a TabletServer address ("ip:port") to the clients * connected to it. * <p> * Access to this map must be synchronized by locking its monitor. - * Lock ordering: when locking both this map and a TabletClient, the - * TabletClient must always be locked first to avoid deadlocks. Logging - * the contents of this map (or calling toString) requires copying it first. + * Logging the contents of this map (or calling toString) requires copying it first. * <p> * This isn't a {@link ConcurrentHashMap} because we don't use it frequently * (just when connecting to / disconnecting from TabletClients) and when we @@ -197,11 +172,12 @@ public class AsyncKuduClient implements AutoCloseable { * that are going to cause unnecessary errors. * @see TabletClientPipeline#handleDisconnect */ - private final HashMap<String, TabletClient> ip2client = - new HashMap<String, TabletClient>(); + @VisibleForTesting + @GuardedBy("ip2client") + final HashMap<String, TabletClient> ip2client = new HashMap<>(); @GuardedBy("sessions") - private final Set<AsyncKuduSession> sessions = new HashSet<AsyncKuduSession>(); + private final Set<AsyncKuduSession> sessions = new HashSet<>(); // Since the masters also go through TabletClient, we need to treat them as if they were a normal // table. We'll use the following fake table name to identify places where we need special @@ -219,9 +195,12 @@ public class AsyncKuduClient implements AutoCloseable { */ private long lastPropagatedTimestamp = NO_TIMESTAMP; - // A table is considered not served when we get an empty list of locations but know - // that a tablet exists. This is currently only used for new tables. The objects stored are - // table IDs. + /** + * A table is considered not served when we get a TABLET_NOT_RUNNING error from the master + * after calling GetTableLocations (it means that some tablets aren't ready to serve yet). + * We cache this information so that concurrent RPCs sent just after creating a table don't + * all try to hit the master for no good reason. + */ private final Set<String> tablesNotServed = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); @@ -989,8 +968,7 @@ public class AsyncKuduClient implements AutoCloseable { } /** - * Clears {@link #tableLocations} and {@link #tablet2client} of the table's - * entries. + * Clears {@link #tableLocations} of the table's entries. * * This method makes the maps momentarily inconsistent, and should only be * used when the {@code AsyncKuduClient} is in a steady state. @@ -1000,12 +978,6 @@ public class AsyncKuduClient implements AutoCloseable { @VisibleForTesting void emptyTabletsCacheForTable(String tableId) { tableLocations.remove(tableId); - Set<Map.Entry<Slice, RemoteTablet>> tablets = tablet2client.entrySet(); - for (Map.Entry<Slice, RemoteTablet> entry : tablets) { - if (entry.getValue().getTableId().equals(tableId)) { - tablets.remove(entry); - } - } } TabletClient clientFor(RemoteTablet tablet) { @@ -1384,26 +1356,10 @@ public class AsyncKuduClient implements AutoCloseable { // already discovered the tablet, its locations are refreshed. List<RemoteTablet> tablets = new ArrayList<>(locations.size()); for (Master.TabletLocationsPB tabletPb : locations) { - // Early creating the tablet so that it parses out the pb. RemoteTablet rt = createTabletFromPb(tableId, tabletPb); Slice tabletId = rt.tabletId; - // If we already know about this tablet, refresh the locations. - RemoteTablet currentTablet = tablet2client.get(tabletId); - if (currentTablet != null) { - currentTablet.refreshTabletClients(tabletPb); - tablets.add(currentTablet); - continue; - } - - // Putting it here first doesn't make it visible because tabletsCache is always looked up - // first. - RemoteTablet oldRt = tablet2client.putIfAbsent(tabletId, rt); - if (oldRt != null) { - // someone beat us to it - continue; - } - LOG.info("Discovered tablet {} for table '{}' with partition {}", + LOG.info("Learned about tablet {} for table '{}' with partition {}", tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition()); rt.refreshTabletClients(tabletPb); tablets.add(rt); @@ -1546,9 +1502,6 @@ public class AsyncKuduClient implements AutoCloseable { chan = channelFactory.newChannel(pipeline); TabletClient oldClient = ip2client.put(hostport, client); assert oldClient == null; - - // The client2tables map is assumed to contain `client` after it is published in ip2client. - this.client2tablets.put(client, new ArrayList<RemoteTablet>()); } final SocketChannelConfig config = chan.getConfig(); config.setConnectTimeoutMillis(5000); @@ -1792,7 +1745,6 @@ public class AsyncKuduClient implements AutoCloseable { TabletClient old; synchronized (ip2client) { old = ip2client.remove(hostport); - client2tablets.remove(client); } LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}"); @@ -2050,8 +2002,8 @@ public class AsyncKuduClient implements AutoCloseable { replicas.set(replicasBuilder.build()); } - // Must be called with tabletServers synchronized - void addTabletClient(String uuid, String host, int port, boolean isLeader) + @GuardedBy("tabletServers") + private void addTabletClient(String uuid, String host, int port, boolean isLeader) throws UnknownHostException { String ip = getIP(host); if (ip == null) { @@ -2059,19 +2011,9 @@ public class AsyncKuduClient implements AutoCloseable { } TabletClient client = newClient(uuid, ip, port); - ArrayList<RemoteTablet> tablets = client2tablets.get(client); - - if (tablets == null) { - // We lost a race, someone removed the client we received. - return; - } - - synchronized (tablets) { - tabletServers.add(client); - if (isLeader) { - leaderIndex = tabletServers.size() - 1; - } - tablets.add(this); + tabletServers.add(client); + if (isLeader) { + leaderIndex = tabletServers.size() - 1; } } http://git-wip-us.apache.org/repos/asf/kudu/blob/1746ae02/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java index 15ce59b..2de16fd 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java @@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest { session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join(); - int numClientsBefore = client.client2tablets.size(); + int numClientsBefore = client.ip2client.size(); // Restart all the tablet servers. killTabletServers(); @@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest { session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join(); // We should not have leaked an entry in the client2tablets map. - int numClientsAfter = client.client2tablets.size(); + int numClientsAfter = client.ip2client.size(); assertEquals(numClientsBefore, numClientsAfter); } finally { restartTabletServers();
