PHOENIX-2556 Subqueries with nested joins may not free hash cache

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a593e98f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a593e98f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a593e98f

Branch: refs/heads/4.x-HBase-1.0
Commit: a593e98fcd80804c00f2a95f51512f3661d1f18f
Parents: a7788da
Author: maryannxue <[email protected]>
Authored: Tue Jan 5 09:36:03 2016 -0500
Committer: maryannxue <[email protected]>
Committed: Tue Jan 5 09:36:03 2016 -0500

----------------------------------------------------------------------
 .../ConnectionQueryServicesTestImpl.java        |  5 +-
 .../org/apache/phoenix/end2end/SubqueryIT.java  | 48 ++++++++++++++++----
 .../apache/phoenix/compile/DeleteCompiler.java  | 36 ++++++++-------
 .../apache/phoenix/execute/HashJoinPlan.java    | 40 ++++++++--------
 4 files changed, 85 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index f5d7f18..2d0ed60 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertEquals;
+
 import java.sql.SQLException;
 import java.util.Properties;
 import java.util.Set;
@@ -82,8 +84,7 @@ public class ConnectionQueryServicesTestImpl extends 
ConnectionQueryServicesImpl
             this.connections = Sets.newHashSet();
             SQLCloseables.closeAll(connections);
             long unfreedBytes = clearCache();
-            // FIXME: once PHOENIX-2556 is fixed, comment this back in
-            // assertEquals(0,unfreedBytes);
+            assertEquals("Found unfreed bytes in server-side cache", 0, 
unfreedBytes);
         } finally {
             super.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index 794c4f5..90ce327 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -45,12 +45,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -366,6 +368,17 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = 
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals("Found bytes not freed on server side", 0, 
unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testNonCorrelatedSubquery() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -628,9 +641,12 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
     @Test
     public void testComparisonSubquery() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        final Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
-            String query = "SELECT \"order_id\", name FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON 
o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+            String query = "SELECT \"order_id\", name FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + 
+                    " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = 
(SELECT max(quantity) FROM " + 
+                    JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = 
q.\"item_id\")";
             PreparedStatement statement = conn.prepareStatement(query);
             ResultSet rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -648,7 +664,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\", name FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON 
o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity) FROM " + 
JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " q ON 
i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" = i2.\"item_id\")";
+            query = "SELECT \"order_id\", name FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + 
+                    " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = 
(SELECT max(quantity) FROM " + 
+                    JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " q ON i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" 
= i2.\"item_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -666,7 +686,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + " 
WHERE \"customer_id\" IN (SELECT \"customer_id\" FROM " + 
JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME + " o ON 
o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' OR quantity > (SELECT 
avg(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = 
q.\"item_id\"))";
+            query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + 
+                    " WHERE \"customer_id\" IN (SELECT \"customer_id\" FROM " 
+ 
+                    JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o ON o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' 
OR quantity > (SELECT avg(quantity) FROM " + 
+                    JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = 
q.\"item_id\"))";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -680,7 +704,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             String plan = QueryUtil.getExplainPlan(rs);
             assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", 
Pattern.matches(plans[4], plan));
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
" o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " 
WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT quantity FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != 
'000000000000004')";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -694,7 +720,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
" o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " 
WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT quantity FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != 
'000000000000003')";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             try {
@@ -703,7 +731,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             } catch (SQLException e) {                
             }
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
" o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME 
+ " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004' 
GROUP BY \"order_id\")";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT max(quantity) FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != 
'000000000000004' GROUP BY \"order_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -717,7 +747,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
" o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME 
+ " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003' 
GROUP BY \"order_id\")";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT max(quantity) FROM " + 
JOIN_ORDER_TABLE_FULL_NAME + 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != 
'000000000000003' GROUP BY \"order_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ac9f7d1..924ed43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -618,23 +618,27 @@ public class DeleteCompiler {
                     @Override
                     public MutationState execute() throws SQLException {
                         ResultIterator iterator = plan.iterator();
-                        if (!hasLimit) {
-                            Tuple tuple;
-                            long totalRowCount = 0;
-                            while ((tuple=iterator.next()) != null) {// Runs 
query
-                                Cell kv = tuple.getValue(0);
-                                totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
+                        try {
+                            if (!hasLimit) {
+                                Tuple tuple;
+                                long totalRowCount = 0;
+                                while ((tuple=iterator.next()) != null) {// 
Runs query
+                                    Cell kv = tuple.getValue(0);
+                                    totalRowCount += 
PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), 
SortOrder.getDefault());
+                                }
+                                // Return total number of rows that have been 
delete. In the case of auto commit being off
+                                // the mutations will all be in the mutation 
state of the current connection.
+                                MutationState state = new 
MutationState(maxSize, connection, totalRowCount);
+
+                                // set the read metrics accumulated in the 
parent context so that it can be published when the mutations are committed.
+                                
state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+
+                                return state;
+                            } else {
+                                return deleteRows(plan.getContext(), tableRef, 
deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, 
plan.getProjector(), plan.getTableRef());
                             }
-                            // Return total number of rows that have been 
delete. In the case of auto commit being off
-                            // the mutations will all be in the mutation state 
of the current connection.
-                            MutationState state = new MutationState(maxSize, 
connection, totalRowCount);
-                            
-                            // set the read metrics accumulated in the parent 
context so that it can be published when the mutations are committed.
-                            
state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
-                            
-                            return state;
-                        } else {
-                            return deleteRows(plan.getContext(), tableRef, 
deleteFromImmutableIndexToo ? plan.getTableRef() : null, iterator, 
plan.getProjector(), plan.getTableRef());
+                        } finally {
+                            iterator.close();
                         }
                     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a593e98f/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index cf89380..053dc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -88,7 +88,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
-    private List<SQLCloseable> dependencies;
+    private final List<SQLCloseable> dependencies = Lists.newArrayList();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
@@ -96,7 +96,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
         if (!(plan instanceof HashJoinPlan))
-            return new HashJoinPlan(statement, plan, joinInfo, subPlans, 
joinInfo == null);
+            return new HashJoinPlan(statement, plan, joinInfo, subPlans, 
joinInfo == null, Collections.<SQLCloseable>emptyList());
         
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
         assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate 
instanceof BaseQueryPlan);
@@ -108,12 +108,13 @@ public class HashJoinPlan extends DelegateQueryPlan {
         for (SubPlan subPlan : subPlans) {
             mergedSubPlans[i++] = subPlan;
         }
-        return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, 
mergedSubPlans, true);
+        return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, 
mergedSubPlans, true, hashJoinPlan.dependencies);
     }
     
     private HashJoinPlan(SelectStatement statement, 
-            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean 
recompileWhereClause) {
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean 
recompileWhereClause, List<SQLCloseable> dependencies) {
         super(plan);
+        this.dependencies.addAll(dependencies);
         this.statement = statement;
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
@@ -143,8 +144,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
         ExecutorService executor = services.getExecutor();
-        List<Future<Object>> futures = 
Lists.<Future<Object>>newArrayListWithExpectedSize(count);
-        dependencies = Lists.newArrayList();
+        List<Future<ServerCache>> futures = 
Lists.newArrayListWithExpectedSize(count);
         if (joinInfo != null) {
             hashClient = hashClient != null ? 
                     hashClient 
@@ -155,11 +155,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
         
         for (int i = 0; i < count; i++) {
             final int index = i;
-            futures.add(executor.submit(new JobCallable<Object>() {
+            futures.add(executor.submit(new JobCallable<ServerCache>() {
 
                 @Override
-                public Object call() throws Exception {
-                    return subPlans[index].execute(HashJoinPlan.this);
+                public ServerCache call() throws Exception {
+                    ServerCache cache = 
subPlans[index].execute(HashJoinPlan.this);
+                    return cache;
                 }
 
                 @Override
@@ -177,7 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
         SQLException firstException = null;
         for (int i = 0; i < count; i++) {
             try {
-                Object result = futures.get(i).get();
+                ServerCache result = futures.get(i).get();
+                if (result != null) {
+                    dependencies.add(result);
+                }
                 subPlans[i].postProcess(result, this);
             } catch (InterruptedException e) {
                 if (firstException == null) {
@@ -261,8 +265,8 @@ public class HashJoinPlan extends DelegateQueryPlan {
     }
 
     protected interface SubPlan {
-        public Object execute(HashJoinPlan parent) throws SQLException;
-        public void postProcess(Object result, HashJoinPlan parent) throws 
SQLException;
+        public ServerCache execute(HashJoinPlan parent) throws SQLException;
+        public void postProcess(ServerCache result, HashJoinPlan parent) 
throws SQLException;
         public List<String> getPreSteps(HashJoinPlan parent) throws 
SQLException;
         public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException;
         public QueryPlan getInnerPlan();
@@ -280,7 +284,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public Object execute(HashJoinPlan parent) throws SQLException {
+        public ServerCache execute(HashJoinPlan parent) throws SQLException {
             List<Object> values = Lists.<Object> newArrayList();
             ResultIterator iterator = plan.iterator();
             RowProjector projector = plan.getProjector();
@@ -319,7 +323,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public void postProcess(Object result, HashJoinPlan parent) throws 
SQLException {
+        public void postProcess(ServerCache result, HashJoinPlan parent) 
throws SQLException {
         }
 
         @Override
@@ -365,7 +369,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public Object execute(HashJoinPlan parent) throws SQLException {
+        public ServerCache execute(HashJoinPlan parent) throws SQLException {
             ScanRanges ranges = parent.delegate.getContext().getScanRanges();
             List<Expression> keyRangeRhsValues = null;
             if (keyRangeRhsExpression != null) {
@@ -387,6 +391,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
                     // Evaluate key expressions for hash join key range 
optimization.
                     
keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression,
 result, plan.getContext().getTempPtr()));
                 }
+                iterator.close();
             }
             if (keyRangeRhsValues != null) {
                 
parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression,
 keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), 
plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable()));
@@ -395,12 +400,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public void postProcess(Object result, HashJoinPlan parent)
+        public void postProcess(ServerCache result, HashJoinPlan parent)
                 throws SQLException {
-            ServerCache cache = (ServerCache) result;
+            ServerCache cache = result;
             if (cache != null) {
                 parent.joinInfo.getJoinIds()[index].set(cache.getId());
-                parent.dependencies.add(cache);
             }
         }
 

Reply via email to