PHOENIX-2558 Fix server-side cache memory leaks

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1cd1d268
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1cd1d268
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1cd1d268

Branch: refs/heads/4.x-HBase-1.0
Commit: 1cd1d26824641a6125236a5d02f43fc22c6df5c3
Parents: 31a414c
Author: James Taylor <[email protected]>
Authored: Sun Jan 3 17:08:31 2016 -0800
Committer: James Taylor <[email protected]>
Committed: Mon Jan 4 09:36:39 2016 -0800

----------------------------------------------------------------------
 .../end2end/BaseTenantSpecificViewIndexIT.java  |  2 --
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 14 +++++++++++++
 .../apache/phoenix/end2end/HashJoinMoreIT.java  | 15 +++++++++++++
 .../end2end/QueryDatabaseMetaDataIT.java        | 18 +++++++---------
 .../apache/phoenix/end2end/QueryTimeoutIT.java  | 14 +++++++++++++
 .../apache/phoenix/cache/ServerCacheClient.java | 13 +++++++++++-
 .../apache/phoenix/execute/MutationState.java   | 16 +++++++-------
 .../phoenix/iterate/BaseResultIterators.java    | 22 +++++++++++++++++++-
 .../MaterializedComparableResultIterator.java   |  3 +++
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  3 +++
 10 files changed, 98 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
index b450643..c10afa6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.end2end;
 
 import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -178,6 +177,5 @@ public class BaseTenantSpecificViewIndexIT extends 
BaseHBaseManagedTimeIT {
             Arrays.<Object>asList(1,7, valuePrefix + "v2-1"),
             Arrays.<Object>asList(1,9, valuePrefix + "v2-1"));
         assertValuesEqualsResultSet(rs,expectedResultsA);
-        assertFalse(rs.next());
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 3d8b006..dd7f6ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -50,12 +50,14 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -78,6 +80,18 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         this.plans = plans;
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = 
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    
     @BeforeClass
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
index a8bb977..98264f0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
@@ -27,14 +27,17 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -101,6 +104,17 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT 
{
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = 
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testJoinOverSaltedTables() throws Exception {
         String tempTableNoSalting = "TEMP_TABLE_NO_SALTING";
@@ -555,6 +569,7 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT {
             assertTrue(rs.next());
             assertEquals(rs.getInt(1), 5);
             assertFalse(rs.next());
+            rs.close();
             rs = conn.createStatement().executeQuery(
                     "SELECT * FROM INVENTORY RIGHT JOIN PRODUCT_IDS ON 
(PRODUCT_ID = INVENTORY.ID)");
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 2fdccf6..ba83e6a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -48,13 +48,11 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
@@ -69,15 +67,15 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -151,17 +149,15 @@ public class QueryDatabaseMetaDataIT extends 
BaseClientManagedTimeIT {
 
         rs = dbmd.getTables(null, CUSTOM_ENTITY_DATA_SCHEMA_NAME, 
CUSTOM_ENTITY_DATA_NAME, null);
         assertTrue(rs.next());
-        
assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME);
-        assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME);
-        assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE"));
-        assertFalse(rs.next());
-        
         try {
             rs.getString("RANDOM_COLUMN_NAME");
             fail();
         } catch (ColumnNotFoundException e) {
             // expected
         }
+        
assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME);
+        assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME);
+        assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE"));
         assertFalse(rs.next());
         
         rs = dbmd.getTables(null, "", "_TABLE", new String[] 
{PTableType.TABLE.toString()});

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
index ba7b461..ccd6530 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
@@ -28,15 +28,18 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.SQLTimeoutException;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -56,6 +59,17 @@ public class QueryTimeoutIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = 
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testQueryTimeout() throws Exception {
         int nRows = 30000;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index f188ab2..424482a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -301,6 +301,7 @@ public class ServerCacheClient {
        ConnectionQueryServices services = connection.getQueryServices();
        Throwable lastThrowable = null;
        TableRef cacheUsingTableRef = 
cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+       final PTable cacheUsingTable = cacheUsingTableRef.getTable();
        byte[] tableName = 
cacheUsingTableRef.getTable().getPhysicalName().getBytes();
        HTableInterface iterateOverTable = services.getTable(tableName);
        try {
@@ -326,7 +327,17 @@ public class ServerCacheClient {
                                                                        new 
BlockingRpcCallback<RemoveServerCacheResponse>();
                                                        
RemoveServerCacheRequest.Builder builder = 
RemoveServerCacheRequest.newBuilder();
                                                        
if(connection.getTenantId() != null){
-                                                               
builder.setTenantId(ByteStringer.wrap(connection.getTenantId().getBytes()));
+                                    try {
+                                        byte[] tenantIdBytes =
+                                                ScanUtil.getTenantIdBytes(
+                                                        
cacheUsingTable.getRowKeySchema(),
+                                                        
cacheUsingTable.getBucketNum()!=null,
+                                                        
connection.getTenantId(),
+                                                        
cacheUsingTable.isMultiTenant());
+                                        
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+                                    } catch (SQLException e) {
+                                        new IOException(e);
+                                    }
                                                        }
                                                        
builder.setCacheId(ByteStringer.wrap(cacheId));
                                                        
instance.removeServerCache(controller, builder.build(), rpcCallback);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8ae9481..bdc9460 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -81,6 +81,7 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
@@ -703,6 +704,8 @@ public class MutationState implements SQLCloseable {
          */
         @Override
         public void delete(List<Delete> deletes) throws IOException {
+            ServerCache cache = null;
+            SQLException sqlE = null;
             try {
                 PTable table = tableRef.getTable();
                 List<PTable> indexes = table.getIndexes();
@@ -736,12 +739,16 @@ public class MutationState implements SQLCloseable {
                         IndexMaintainer.serializeAdditional(table, 
indexMetaDataPtr, keyValueIndexes, connection);
                     }
                     if (attachMetaData) {
-                        setMetaDataOnMutations(tableRef, deletes, 
indexMetaDataPtr);
+                        cache = setMetaDataOnMutations(tableRef, deletes, 
indexMetaDataPtr);
                     }
                 }
                 delegate.delete(deletes);
             } catch (SQLException e) {
                 throw new IOException(e);
+            } finally {
+                if (cache != null) {
+                    
SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
+                }
             }
         }
     }
@@ -793,10 +800,7 @@ public class MutationState implements SQLCloseable {
                        int retryCount = 0;
                        boolean shouldRetry = false;
                        do {
-                           ServerCache cache = null;
-                           if (isDataTable) {
-                               cache = setMetaDataOnMutations(tableRef, 
mutationList, indexMetaDataPtr);
-                           }
+                           final ServerCache cache = isDataTable ? 
setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null;
                        
                            // If we haven't retried yet, retry for this case 
only, as it's possible that
                            // a split will occur after we send the index 
metadata cache to all known
@@ -879,8 +883,6 @@ public class MutationState implements SQLCloseable {
                                        }
                                    } 
                                    if (sqlE != null) {
-                                       // clear pending mutations
-                                       mutations.clear();
                                        throw sqlE;
                                    }
                                }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 21f082f..2806acd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -704,6 +704,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         // to get into a funk. Instead, just cancel queued work.
         boolean cancelledWork = false;
         try {
+            List<Future<PeekingResultIterator>> futuresToClose = 
Lists.newArrayListWithExpectedSize(getSplits().size());
             for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures 
: allFutures) {
                 for (List<Pair<Scan,Future<PeekingResultIterator>>> 
futureScans : futures) {
                     for (Pair<Scan,Future<PeekingResultIterator>> futurePair : 
futureScans) {
@@ -712,16 +713,35 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                         if (futurePair != null) {
                             Future<PeekingResultIterator> future = 
futurePair.getSecond();
                             if (future != null) {
-                                future.cancel(false);
+                                if (future.cancel(false)) {
+                                    cancelledWork = true;
+                                } else {
+                                    futuresToClose.add(future);
+                                }
                             }
                         }
                     }
                 }
             }
+            // Wait for already started tasks to complete as we can't 
interrupt them without
+            // leaving our HConnection in a funky state.
+            for (Future<PeekingResultIterator> future : futuresToClose) {
+                try {
+                    PeekingResultIterator iterator = future.get();
+                    iterator.close();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                } catch (ExecutionException e) {
+                    logger.info("Failed to execute task during cancel", e);
+                    continue;
+                }
+            }
         } finally {
             if (cancelledWork) {
                 
context.getConnection().getQueryServices().getExecutor().purge();
             }
+            allFutures.clear();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
index 093a098..a76f1e3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
@@ -45,16 +45,19 @@ public class MaterializedComparableResultIterator
         this.current = delegate.peek();
     }
 
+    @Override
     public Tuple next() throws SQLException {
         Tuple next = delegate.next();
         this.current = delegate.peek();
         return next;
     }
 
+    @Override
     public Tuple peek() throws SQLException {
         return delegate.peek();
     }
 
+    @Override
     public void close() throws SQLException {
         delegate.close();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index a3ce1a1..47c17ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -776,6 +776,9 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
                 overAllQueryMetrics.startResultSetWatch();
             }
             currentRow = scanner.next();
+            if (currentRow == null) {
+                close();
+            }
             rowProjector.reset();
         } catch (RuntimeException e) {
             // FIXME: Expression.evaluate does not throw SQLException

Reply via email to