Repository: phoenix Updated Branches: refs/heads/master 247ac8eee -> 4cd176e5f
PHOENIX-1090 - Fix HTable leak in ServerCacheClient (SamarthJain) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4cd176e5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4cd176e5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4cd176e5 Branch: refs/heads/master Commit: 4cd176e5fd784a609ab5fb3a2d29460247d9b96a Parents: 247ac8e Author: Mujtaba <[email protected]> Authored: Tue Jul 15 14:41:05 2014 -0700 Committer: Mujtaba <[email protected]> Committed: Tue Jul 15 14:41:05 2014 -0700 ---------------------------------------------------------------------- .../apache/phoenix/cache/ServerCacheClient.java | 100 ++++++++++--------- 1 file changed, 53 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4cd176e5/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 4c098c4..301c452 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.cache; +import static com.google.common.io.Closeables.closeQuietly; + import java.io.Closeable; import java.io.IOException; import java.sql.SQLException; @@ -268,53 +270,57 @@ public class ServerCacheClient { * @throws IllegalStateException if hashed table cannot be removed on any region server on which it was added */ private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> servers) throws SQLException { - ConnectionQueryServices services = connection.getQueryServices(); - Throwable lastThrowable = null; - TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); - byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); - HTableInterface iterateOverTable = services.getTable(tableName); - List<HRegionLocation> locations = services.getAllTableRegions(tableName); - Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers); - /** - * Allow for the possibility that the region we based where to send our cache has split and been - * relocated to another region server *after* we sent it, but before we removed it. To accommodate - * this, we iterate through the current metadata boundaries and remove the cache once for each - * server that we originally sent to. - */ - if (LOG.isDebugEnabled()) {LOG.debug("Removing Cache " + cacheId + " from servers.");} - for (HRegionLocation entry : locations) { - if (remainingOnServers.contains(entry)) { // Call once per server - try { - byte[] key = entry.getRegionInfo().getStartKey(); - iterateOverTable.coprocessorService(ServerCachingService.class, key, key, - new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() { - @Override - public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = - new BlockingRpcCallback<RemoveServerCacheResponse>(); - RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder(); - if(connection.getTenantId() != null){ - builder.setTenantId(HBaseZeroCopyByteString.wrap(connection.getTenantId().getBytes())); - } - builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId)); - instance.removeServerCache(controller, builder.build(), rpcCallback); - if(controller.getFailedOn() != null) { - throw controller.getFailedOn(); - } - return rpcCallback.get(); - } - }); - remainingOnServers.remove(entry); - } catch (Throwable t) { - lastThrowable = t; - LOG.error("Error trying to remove hash cache for " + entry, t); - } - } - } - if (!remainingOnServers.isEmpty()) { - LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable); - } + ConnectionQueryServices services = connection.getQueryServices(); + Throwable lastThrowable = null; + TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); + byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); + HTableInterface iterateOverTable = services.getTable(tableName); + try { + List<HRegionLocation> locations = services.getAllTableRegions(tableName); + Set<HRegionLocation> remainingOnServers = new HashSet<HRegionLocation>(servers); + /** + * Allow for the possibility that the region we based where to send our cache has split and been + * relocated to another region server *after* we sent it, but before we removed it. To accommodate + * this, we iterate through the current metadata boundaries and remove the cache once for each + * server that we originally sent to. + */ + if (LOG.isDebugEnabled()) {LOG.debug("Removing Cache " + cacheId + " from servers.");} + for (HRegionLocation entry : locations) { + if (remainingOnServers.contains(entry)) { // Call once per server + try { + byte[] key = entry.getRegionInfo().getStartKey(); + iterateOverTable.coprocessorService(ServerCachingService.class, key, key, + new Batch.Call<ServerCachingService, RemoveServerCacheResponse>() { + @Override + public RemoveServerCacheResponse call(ServerCachingService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback = + new BlockingRpcCallback<RemoveServerCacheResponse>(); + RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder(); + if(connection.getTenantId() != null){ + builder.setTenantId(HBaseZeroCopyByteString.wrap(connection.getTenantId().getBytes())); + } + builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId)); + instance.removeServerCache(controller, builder.build(), rpcCallback); + if(controller.getFailedOn() != null) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); + remainingOnServers.remove(entry); + } catch (Throwable t) { + lastThrowable = t; + LOG.error("Error trying to remove hash cache for " + entry, t); + } + } + } + if (!remainingOnServers.isEmpty()) { + LOG.warn("Unable to remove hash cache for " + remainingOnServers, lastThrowable); + } + } finally { + closeQuietly(iterateOverTable); + } } /**
