PHOENIX-2556 Subqueries with nested joins may not free hash cache
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a593e98f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a593e98f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a593e98f Branch: refs/heads/4.x-HBase-1.0 Commit: a593e98fcd80804c00f2a95f51512f3661d1f18f Parents: a7788da Author: maryannxue <[email protected]> Authored: Tue Jan 5 09:36:03 2016 -0500 Committer: maryannxue <[email protected]> Committed: Tue Jan 5 09:36:03 2016 -0500 ---------------------------------------------------------------------- .../ConnectionQueryServicesTestImpl.java | 5 +- .../org/apache/phoenix/end2end/SubqueryIT.java | 48 ++++++++++++++++---- .../apache/phoenix/compile/DeleteCompiler.java | 36 ++++++++------- .../apache/phoenix/execute/HashJoinPlan.java | 40 ++++++++-------- 4 files changed, 85 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java index f5d7f18..2d0ed60 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.end2end; +import static org.junit.Assert.assertEquals; + import java.sql.SQLException; import java.util.Properties; import java.util.Set; @@ -82,8 +84,7 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl this.connections = Sets.newHashSet(); SQLCloseables.closeAll(connections); long unfreedBytes = clearCache(); - // FIXME: once PHOENIX-2556 is fixed, comment this back in - // assertEquals(0,unfreedBytes); + assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes); } finally { super.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java index 794c4f5..90ce327 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java @@ -45,12 +45,14 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -366,6 +368,17 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { } } + @After + public void assertNoUnfreedMemory() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + try { + long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + assertEquals("Found bytes not freed on server side", 0, unfreedBytes); + } finally { + conn.close(); + } + } + @Test public void testNonCorrelatedSubquery() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -628,9 +641,12 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { @Test public void testComparisonSubquery() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); + final Connection conn = DriverManager.getConnection(getUrl(), props); try { - String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")"; + String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")"; PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue (rs.next()); @@ -648,7 +664,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); - query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " q ON i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" = i2.\"item_id\")"; + query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + + JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + JOIN_ORDER_TABLE_FULL_NAME + + " q ON i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" = i2.\"item_id\")"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); assertTrue (rs.next()); @@ -666,7 +686,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); - query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + " WHERE \"customer_id\" IN (SELECT \"customer_id\" FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' OR quantity > (SELECT avg(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\"))"; + query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + + " WHERE \"customer_id\" IN (SELECT \"customer_id\" FROM " + + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME + + " o ON o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' OR quantity > (SELECT avg(quantity) FROM " + + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\"))"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); assertTrue (rs.next()); @@ -680,7 +704,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { String plan = QueryUtil.getExplainPlan(rs); assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", Pattern.matches(plans[4], plan)); - query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')"; + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); assertTrue (rs.next()); @@ -694,7 +720,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); - query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')"; + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); try { @@ -703,7 +731,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { } catch (SQLException e) { } - query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")"; + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); assertTrue (rs.next()); @@ -717,7 +747,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); - query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")"; + query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + + " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")"; statement = conn.prepareStatement(query); rs = statement.executeQuery(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index ac9f7d1..924ed43 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -618,23 +618,27 @@ public class DeleteCompiler { @Override public MutationState execute() throws SQLException { ResultIterator iterator = plan.iterator(); - if (!hasLimit) { - Tuple tuple; - long totalRowCount = 0; - while ((tuple=iterator.next()) != null) {// Runs query - Cell kv = tuple.getValue(0); - totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); + try { + if (!hasLimit) { + Tuple tuple; + long totalRowCount = 0; + while ((tuple=iterator.next()) != null) {// Runs query + Cell kv = tuple.getValue(0); + totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault()); + } + // Return total number of rows that have been delete. In the case of auto commit being off + // the mutations will all be in the mutation state of the current connection. + MutationState state = new MutationState(maxSize, connection, totalRowCount); + + // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. + state.setReadMetricQueue(plan.getContext().getReadMetricsQueue()); + + return state; + } else { + return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); } - // Return total number of rows that have been delete. In the case of auto commit being off - // the mutations will all be in the mutation state of the current connection. - MutationState state = new MutationState(maxSize, connection, totalRowCount); - - // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed. - state.setReadMetricQueue(plan.getContext().getReadMetricsQueue()); - - return state; - } else { - return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef()); + } finally { + iterator.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index cf89380..053dc2b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -88,7 +88,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final boolean recompileWhereClause; private final Set<TableRef> tableRefs; private final int maxServerCacheTimeToLive; - private List<SQLCloseable> dependencies; + private final List<SQLCloseable> dependencies = Lists.newArrayList(); private HashCacheClient hashClient; private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; @@ -96,7 +96,7 @@ public class HashJoinPlan extends DelegateQueryPlan { public static HashJoinPlan create(SelectStatement statement, QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) { if (!(plan instanceof HashJoinPlan)) - return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null); + return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<SQLCloseable>emptyList()); HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan); @@ -108,12 +108,13 @@ public class HashJoinPlan extends DelegateQueryPlan { for (SubPlan subPlan : subPlans) { mergedSubPlans[i++] = subPlan; } - return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true); + return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true, hashJoinPlan.dependencies); } private HashJoinPlan(SelectStatement statement, - QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) { + QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, List<SQLCloseable> dependencies) { super(plan); + this.dependencies.addAll(dependencies); this.statement = statement; this.joinInfo = joinInfo; this.subPlans = subPlans; @@ -143,8 +144,7 @@ public class HashJoinPlan extends DelegateQueryPlan { PhoenixConnection connection = getContext().getConnection(); ConnectionQueryServices services = connection.getQueryServices(); ExecutorService executor = services.getExecutor(); - List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count); - dependencies = Lists.newArrayList(); + List<Future<ServerCache>> futures = Lists.newArrayListWithExpectedSize(count); if (joinInfo != null) { hashClient = hashClient != null ? hashClient @@ -155,11 +155,12 @@ public class HashJoinPlan extends DelegateQueryPlan { for (int i = 0; i < count; i++) { final int index = i; - futures.add(executor.submit(new JobCallable<Object>() { + futures.add(executor.submit(new JobCallable<ServerCache>() { @Override - public Object call() throws Exception { - return subPlans[index].execute(HashJoinPlan.this); + public ServerCache call() throws Exception { + ServerCache cache = subPlans[index].execute(HashJoinPlan.this); + return cache; } @Override @@ -177,7 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan { SQLException firstException = null; for (int i = 0; i < count; i++) { try { - Object result = futures.get(i).get(); + ServerCache result = futures.get(i).get(); + if (result != null) { + dependencies.add(result); + } subPlans[i].postProcess(result, this); } catch (InterruptedException e) { if (firstException == null) { @@ -261,8 +265,8 @@ public class HashJoinPlan extends DelegateQueryPlan { } protected interface SubPlan { - public Object execute(HashJoinPlan parent) throws SQLException; - public void postProcess(Object result, HashJoinPlan parent) throws SQLException; + public ServerCache execute(HashJoinPlan parent) throws SQLException; + public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException; public List<String> getPreSteps(HashJoinPlan parent) throws SQLException; public List<String> getPostSteps(HashJoinPlan parent) throws SQLException; public QueryPlan getInnerPlan(); @@ -280,7 +284,7 @@ public class HashJoinPlan extends DelegateQueryPlan { } @Override - public Object execute(HashJoinPlan parent) throws SQLException { + public ServerCache execute(HashJoinPlan parent) throws SQLException { List<Object> values = Lists.<Object> newArrayList(); ResultIterator iterator = plan.iterator(); RowProjector projector = plan.getProjector(); @@ -319,7 +323,7 @@ public class HashJoinPlan extends DelegateQueryPlan { } @Override - public void postProcess(Object result, HashJoinPlan parent) throws SQLException { + public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException { } @Override @@ -365,7 +369,7 @@ public class HashJoinPlan extends DelegateQueryPlan { } @Override - public Object execute(HashJoinPlan parent) throws SQLException { + public ServerCache execute(HashJoinPlan parent) throws SQLException { ScanRanges ranges = parent.delegate.getContext().getScanRanges(); List<Expression> keyRangeRhsValues = null; if (keyRangeRhsExpression != null) { @@ -387,6 +391,7 @@ public class HashJoinPlan extends DelegateQueryPlan { // Evaluate key expressions for hash join key range optimization. keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression, result, plan.getContext().getTempPtr())); } + iterator.close(); } if (keyRangeRhsValues != null) { parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression, keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable())); @@ -395,12 +400,11 @@ public class HashJoinPlan extends DelegateQueryPlan { } @Override - public void postProcess(Object result, HashJoinPlan parent) + public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException { - ServerCache cache = (ServerCache) result; + ServerCache cache = result; if (cache != null) { parent.joinInfo.getJoinIds()[index].set(cache.getId()); - parent.dependencies.add(cache); } }
