Repository: phoenix
Updated Branches:
  refs/heads/master 247ac8eee -> 4cd176e5f


PHOENIX-1090 - Fix HTable leak in ServerCacheClient (SamarthJain)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4cd176e5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4cd176e5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4cd176e5

Branch: refs/heads/master
Commit: 4cd176e5fd784a609ab5fb3a2d29460247d9b96a
Parents: 247ac8e
Author: Mujtaba <[email protected]>
Authored: Tue Jul 15 14:41:05 2014 -0700
Committer: Mujtaba <[email protected]>
Committed: Tue Jul 15 14:41:05 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/cache/ServerCacheClient.java | 100 ++++++++++---------
 1 file changed, 53 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4cd176e5/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 4c098c4..301c452 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.cache;
 
+import static com.google.common.io.Closeables.closeQuietly;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -268,53 +270,57 @@ public class ServerCacheClient {
      * @throws IllegalStateException if hashed table cannot be removed on any 
region server on which it was added
      */
     private void removeServerCache(final byte[] cacheId, Set<HRegionLocation> 
servers) throws SQLException {
-        ConnectionQueryServices services = connection.getQueryServices();
-        Throwable lastThrowable = null;
-        TableRef cacheUsingTableRef = 
cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
-        byte[] tableName = 
cacheUsingTableRef.getTable().getPhysicalName().getBytes();
-        HTableInterface iterateOverTable = services.getTable(tableName);
-        List<HRegionLocation> locations = 
services.getAllTableRegions(tableName);
-        Set<HRegionLocation> remainingOnServers = new 
HashSet<HRegionLocation>(servers);
-        /**
-         * Allow for the possibility that the region we based where to send 
our cache has split and been
-         * relocated to another region server *after* we sent it, but before 
we removed it. To accommodate
-         * this, we iterate through the current metadata boundaries and remove 
the cache once for each
-         * server that we originally sent to.
-         */
-        if (LOG.isDebugEnabled()) {LOG.debug("Removing Cache " + cacheId + " 
from servers.");}
-        for (HRegionLocation entry : locations) {
-            if (remainingOnServers.contains(entry)) {  // Call once per server
-                try {
-                    byte[] key = entry.getRegionInfo().getStartKey();
-                    
iterateOverTable.coprocessorService(ServerCachingService.class, key, key, 
-                        new Batch.Call<ServerCachingService, 
RemoveServerCacheResponse>() {
-                            @Override
-                            public RemoveServerCacheResponse 
call(ServerCachingService instance) throws IOException {
-                                ServerRpcController controller = new 
ServerRpcController();
-                                BlockingRpcCallback<RemoveServerCacheResponse> 
rpcCallback =
-                                        new 
BlockingRpcCallback<RemoveServerCacheResponse>();
-                                RemoveServerCacheRequest.Builder builder = 
RemoveServerCacheRequest.newBuilder();
-                                if(connection.getTenantId() != null){
-                                    
builder.setTenantId(HBaseZeroCopyByteString.wrap(connection.getTenantId().getBytes()));
-                                }
-                                
builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId));
-                                instance.removeServerCache(controller, 
builder.build(), rpcCallback);
-                                if(controller.getFailedOn() != null) {
-                                    throw controller.getFailedOn();
-                                }
-                                return rpcCallback.get(); 
-                            }
-                          });
-                    remainingOnServers.remove(entry);
-                } catch (Throwable t) {
-                    lastThrowable = t;
-                    LOG.error("Error trying to remove hash cache for " + 
entry, t);
-                }
-            }
-        }
-        if (!remainingOnServers.isEmpty()) {
-            LOG.warn("Unable to remove hash cache for " + remainingOnServers, 
lastThrowable);
-        }
+       ConnectionQueryServices services = connection.getQueryServices();
+       Throwable lastThrowable = null;
+       TableRef cacheUsingTableRef = 
cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+       byte[] tableName = 
cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+       HTableInterface iterateOverTable = services.getTable(tableName);
+       try {
+               List<HRegionLocation> locations = 
services.getAllTableRegions(tableName);
+               Set<HRegionLocation> remainingOnServers = new 
HashSet<HRegionLocation>(servers);
+               /**
+                * Allow for the possibility that the region we based where to 
send our cache has split and been
+                * relocated to another region server *after* we sent it, but 
before we removed it. To accommodate
+                * this, we iterate through the current metadata boundaries and 
remove the cache once for each
+                * server that we originally sent to.
+                */
+               if (LOG.isDebugEnabled()) {LOG.debug("Removing Cache " + 
cacheId + " from servers.");}
+               for (HRegionLocation entry : locations) {
+                       if (remainingOnServers.contains(entry)) {  // Call once 
per server
+                               try {
+                                       byte[] key = 
entry.getRegionInfo().getStartKey();
+                                       
iterateOverTable.coprocessorService(ServerCachingService.class, key, key, 
+                                                       new 
Batch.Call<ServerCachingService, RemoveServerCacheResponse>() {
+                                               @Override
+                                               public 
RemoveServerCacheResponse call(ServerCachingService instance) throws 
IOException {
+                                                       ServerRpcController 
controller = new ServerRpcController();
+                                                       
BlockingRpcCallback<RemoveServerCacheResponse> rpcCallback =
+                                                                       new 
BlockingRpcCallback<RemoveServerCacheResponse>();
+                                                       
RemoveServerCacheRequest.Builder builder = 
RemoveServerCacheRequest.newBuilder();
+                                                       
if(connection.getTenantId() != null){
+                                                               
builder.setTenantId(HBaseZeroCopyByteString.wrap(connection.getTenantId().getBytes()));
+                                                       }
+                                                       
builder.setCacheId(HBaseZeroCopyByteString.wrap(cacheId));
+                                                       
instance.removeServerCache(controller, builder.build(), rpcCallback);
+                                                       
if(controller.getFailedOn() != null) {
+                                                               throw 
controller.getFailedOn();
+                                                       }
+                                                       return 
rpcCallback.get(); 
+                                               }
+                                       });
+                                       remainingOnServers.remove(entry);
+                               } catch (Throwable t) {
+                                       lastThrowable = t;
+                                       LOG.error("Error trying to remove hash 
cache for " + entry, t);
+                               }
+                       }
+               }
+               if (!remainingOnServers.isEmpty()) {
+                       LOG.warn("Unable to remove hash cache for " + 
remainingOnServers, lastThrowable);
+               }
+       } finally {
+               closeQuietly(iterateOverTable);
+       }
     }
 
     /**

Reply via email to