PHOENIX-2558 Fix server-side cache memory leaks
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1cd1d268 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1cd1d268 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1cd1d268 Branch: refs/heads/4.x-HBase-1.0 Commit: 1cd1d26824641a6125236a5d02f43fc22c6df5c3 Parents: 31a414c Author: James Taylor <[email protected]> Authored: Sun Jan 3 17:08:31 2016 -0800 Committer: James Taylor <[email protected]> Committed: Mon Jan 4 09:36:39 2016 -0800 ---------------------------------------------------------------------- .../end2end/BaseTenantSpecificViewIndexIT.java | 2 -- .../org/apache/phoenix/end2end/HashJoinIT.java | 14 +++++++++++++ .../apache/phoenix/end2end/HashJoinMoreIT.java | 15 +++++++++++++ .../end2end/QueryDatabaseMetaDataIT.java | 18 +++++++--------- .../apache/phoenix/end2end/QueryTimeoutIT.java | 14 +++++++++++++ .../apache/phoenix/cache/ServerCacheClient.java | 13 +++++++++++- .../apache/phoenix/execute/MutationState.java | 16 +++++++------- .../phoenix/iterate/BaseResultIterators.java | 22 +++++++++++++++++++- .../MaterializedComparableResultIterator.java | 3 +++ .../apache/phoenix/jdbc/PhoenixResultSet.java | 3 +++ 10 files changed, 98 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java index b450643..c10afa6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java @@ -19,7 +19,6 @@ package org.apache.phoenix.end2end; import static com.google.common.collect.Sets.newHashSet; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import java.sql.Connection; import java.sql.DriverManager; @@ -178,6 +177,5 @@ public class BaseTenantSpecificViewIndexIT extends BaseHBaseManagedTimeIT { Arrays.<Object>asList(1,7, valuePrefix + "v2-1"), Arrays.<Object>asList(1,9, valuePrefix + "v2-1")); assertValuesEqualsResultSet(rs,expectedResultsA); - assertFalse(rs.next()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 3d8b006..dd7f6ba 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -50,12 +50,14 @@ import java.util.Map; import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -78,6 +80,18 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { this.plans = plans; } + @After + public void assertNoUnfreedMemory() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + try { + long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + assertEquals(0,unfreedBytes); + } finally { + conn.close(); + } + } + + @BeforeClass @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java index a8bb977..98264f0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java @@ -27,14 +27,17 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import java.util.Properties; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -101,6 +104,17 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + @After + public void assertNoUnfreedMemory() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + try { + long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + assertEquals(0,unfreedBytes); + } finally { + conn.close(); + } + } + @Test public void testJoinOverSaltedTables() throws Exception { String tempTableNoSalting = "TEMP_TABLE_NO_SALTING"; @@ -555,6 +569,7 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT { assertTrue(rs.next()); assertEquals(rs.getInt(1), 5); assertFalse(rs.next()); + rs.close(); rs = conn.createStatement().executeQuery( "SELECT * FROM INVENTORY RIGHT JOIN PRODUCT_IDS ON (PRODUCT_ID = INVENTORY.ID)"); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java index 2fdccf6..ba83e6a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java @@ -48,13 +48,11 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Arrays; import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; @@ -69,15 +67,15 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.ColumnNotFoundException; -import org.apache.phoenix.schema.types.PDecimal; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -151,17 +149,15 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT { rs = dbmd.getTables(null, CUSTOM_ENTITY_DATA_SCHEMA_NAME, CUSTOM_ENTITY_DATA_NAME, null); assertTrue(rs.next()); - assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME); - assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME); - assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE")); - assertFalse(rs.next()); - try { rs.getString("RANDOM_COLUMN_NAME"); fail(); } catch (ColumnNotFoundException e) { // expected } + assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME); + assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME); + assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE")); assertFalse(rs.next()); rs = dbmd.getTables(null, "", "_TABLE", new String[] {PTableType.TABLE.toString()}); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java index ba7b461..ccd6530 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java @@ -28,15 +28,18 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.Map; import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -56,6 +59,17 @@ public class QueryTimeoutIT extends BaseOwnClusterHBaseManagedTimeIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + @After + public void assertNoUnfreedMemory() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + try { + long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + assertEquals(0,unfreedBytes); + } finally { + conn.close(); + } + } + @Test public void testQueryTimeout() throws Exception { int nRows = 30000; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index f188ab2..424482a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -301,6 +301,7 @@ public class ServerCacheClient { ConnectionQueryServices services = connection.getQueryServices(); Throwable lastThrowable = null; TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId)); + final PTable cacheUsingTable = cacheUsingTableRef.getTable(); byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes(); HTableInterface iterateOverTable = services.getTable(tableName); try { @@ -326,7 +327,17 @@ public class ServerCacheClient { new BlockingRpcCallback<RemoveServerCacheResponse>(); RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder(); if(connection.getTenantId() != null){ - builder.setTenantId(ByteStringer.wrap(connection.getTenantId().getBytes())); + try { + byte[] tenantIdBytes = + ScanUtil.getTenantIdBytes( + cacheUsingTable.getRowKeySchema(), + cacheUsingTable.getBucketNum()!=null, + connection.getTenantId(), + cacheUsingTable.isMultiTenant()); + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + } catch (SQLException e) { + new IOException(e); + } } builder.setCacheId(ByteStringer.wrap(cacheId)); instance.removeServerCache(controller, builder.build(), rpcCallback); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 8ae9481..bdc9460 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -81,6 +81,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; +import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; @@ -703,6 +704,8 @@ public class MutationState implements SQLCloseable { */ @Override public void delete(List<Delete> deletes) throws IOException { + ServerCache cache = null; + SQLException sqlE = null; try { PTable table = tableRef.getTable(); List<PTable> indexes = table.getIndexes(); @@ -736,12 +739,16 @@ public class MutationState implements SQLCloseable { IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); } if (attachMetaData) { - setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); } } delegate.delete(deletes); } catch (SQLException e) { throw new IOException(e); + } finally { + if (cache != null) { + SQLCloseables.closeAllQuietly(Collections.singletonList(cache)); + } } } } @@ -793,10 +800,7 @@ public class MutationState implements SQLCloseable { int retryCount = 0; boolean shouldRetry = false; do { - ServerCache cache = null; - if (isDataTable) { - cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr); - } + final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null; // If we haven't retried yet, retry for this case only, as it's possible that // a split will occur after we send the index metadata cache to all known @@ -879,8 +883,6 @@ public class MutationState implements SQLCloseable { } } if (sqlE != null) { - // clear pending mutations - mutations.clear(); throw sqlE; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 21f082f..2806acd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -704,6 +704,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // to get into a funk. Instead, just cancel queued work. boolean cancelledWork = false; try { + List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size()); for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) { for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) { for (Pair<Scan,Future<PeekingResultIterator>> futurePair : futureScans) { @@ -712,16 +713,35 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (futurePair != null) { Future<PeekingResultIterator> future = futurePair.getSecond(); if (future != null) { - future.cancel(false); + if (future.cancel(false)) { + cancelledWork = true; + } else { + futuresToClose.add(future); + } } } } } } + // Wait for already started tasks to complete as we can't interrupt them without + // leaving our HConnection in a funky state. + for (Future<PeekingResultIterator> future : futuresToClose) { + try { + PeekingResultIterator iterator = future.get(); + iterator.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + logger.info("Failed to execute task during cancel", e); + continue; + } + } } finally { if (cancelledWork) { context.getConnection().getQueryServices().getExecutor().purge(); } + allFutures.clear(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java index 093a098..a76f1e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java @@ -45,16 +45,19 @@ public class MaterializedComparableResultIterator this.current = delegate.peek(); } + @Override public Tuple next() throws SQLException { Tuple next = delegate.next(); this.current = delegate.peek(); return next; } + @Override public Tuple peek() throws SQLException { return delegate.peek(); } + @Override public void close() throws SQLException { delegate.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1cd1d268/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index a3ce1a1..47c17ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -776,6 +776,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { overAllQueryMetrics.startResultSetWatch(); } currentRow = scanner.next(); + if (currentRow == null) { + close(); + } rowProjector.reset(); } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException
