Repository: phoenix
Updated Branches:
  refs/heads/4.11-HBase-1.2 3e81387c5 -> 2c22be8bb


PHOENIX-3981 Ensure iterators are closed for join plans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2c22be8b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2c22be8b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2c22be8b

Branch: refs/heads/4.11-HBase-1.2
Commit: 2c22be8bb77676fb18d816b0e6d5368af69dda7b
Parents: 3e81387
Author: Samarth Jain <[email protected]>
Authored: Wed Jun 28 13:34:05 2017 -0700
Committer: Samarth Jain <[email protected]>
Committed: Wed Jun 28 13:34:05 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/HashJoinPlan.java    | 102 +++++++++++--------
 .../phoenix/execute/SortMergeJoinPlan.java      |  32 +++++-
 2 files changed, 89 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c22be8b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 64e2ce2..17c3cca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -293,39 +293,43 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException {
             List<Object> values = Lists.<Object> newArrayList();
             ResultIterator iterator = plan.iterator();
-            RowProjector projector = plan.getProjector();
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            int columnCount = projector.getColumnCount();
-            int rowCount = 0;
-            PDataType baseType = PVarbinary.INSTANCE;
-            for (Tuple tuple = iterator.next(); tuple != null; tuple = 
iterator.next()) {
-                if (expectSingleRow && rowCount >= 1)
-                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
-                
-                if (columnCount == 1) {
-                    ColumnProjector columnProjector = 
projector.getColumnProjector(0);
-                    baseType = columnProjector.getExpression().getDataType();
-                    Object value = columnProjector.getValue(tuple, baseType, 
ptr);
-                    values.add(value);
-                } else {
-                    List<Expression> expressions = 
Lists.<Expression>newArrayListWithExpectedSize(columnCount);
-                    for (int i = 0; i < columnCount; i++) {
-                        ColumnProjector columnProjector = 
projector.getColumnProjector(i);
-                        PDataType type = 
columnProjector.getExpression().getDataType();
-                        Object value = columnProjector.getValue(tuple, type, 
ptr);
-                        expressions.add(LiteralExpression.newConstant(value, 
type));
+            try {
+                RowProjector projector = plan.getProjector();
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                int columnCount = projector.getColumnCount();
+                int rowCount = 0;
+                PDataType baseType = PVarbinary.INSTANCE;
+                for (Tuple tuple = iterator.next(); tuple != null; tuple = 
iterator.next()) {
+                    if (expectSingleRow && rowCount >= 1)
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+
+                    if (columnCount == 1) {
+                        ColumnProjector columnProjector = 
projector.getColumnProjector(0);
+                        baseType = 
columnProjector.getExpression().getDataType();
+                        Object value = columnProjector.getValue(tuple, 
baseType, ptr);
+                        values.add(value);
+                    } else {
+                        List<Expression> expressions = 
Lists.<Expression>newArrayListWithExpectedSize(columnCount);
+                        for (int i = 0; i < columnCount; i++) {
+                            ColumnProjector columnProjector = 
projector.getColumnProjector(i);
+                            PDataType type = 
columnProjector.getExpression().getDataType();
+                            Object value = columnProjector.getValue(tuple, 
type, ptr);
+                            
expressions.add(LiteralExpression.newConstant(value, type));
+                        }
+                        Expression expression = new 
RowValueConstructorExpression(expressions, true);
+                        baseType = expression.getDataType();
+                        expression.evaluate(null, ptr);
+                        values.add(baseType.toObject(ptr));
                     }
-                    Expression expression = new 
RowValueConstructorExpression(expressions, true);
-                    baseType = expression.getDataType();
-                    expression.evaluate(null, ptr);
-                    values.add(baseType.toObject(ptr));
+                    rowCount++;
                 }
-                rowCount++;
+
+                Object result = expectSingleRow ? (values.isEmpty() ? null : 
values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, 
values.toArray());
+                parent.getContext().setSubqueryResult(select, result);
+                return null;
+            } finally {
+                iterator.close();
             }
-            
-            Object result = expectSingleRow ? (values.isEmpty() ? null : 
values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, 
values.toArray());
-            parent.getContext().setSubqueryResult(select, result);
-            return null;
         }
 
         @Override
@@ -383,21 +387,37 @@ public class HashJoinPlan extends DelegateQueryPlan {
             }
             ServerCache cache = null;
             if (hashExpressions != null) {
-                cache = parent.hashClient.addHashCache(ranges, 
plan.iterator(), 
-                        plan.getEstimatedSize(), hashExpressions, 
singleValueOnly, parent.delegate.getTableRef(), keyRangeRhsExpression, 
keyRangeRhsValues);
-                long endTime = System.currentTimeMillis();
-                boolean isSet = parent.firstJobEndTime.compareAndSet(0, 
endTime);
-                if (!isSet && (endTime - parent.firstJobEndTime.get()) > 
parent.maxServerCacheTimeToLive) {
-                    LOG.warn(addCustomAnnotations("Hash plan [" + index + "] 
execution seems too slow. Earlier hash cache(s) might have expired on 
servers.", parent.delegate.getContext().getConnection()));
+                ResultIterator iterator = plan.iterator();
+                try {
+                    cache =
+                            parent.hashClient.addHashCache(ranges, iterator,
+                                plan.getEstimatedSize(), hashExpressions, 
singleValueOnly,
+                                parent.delegate.getTableRef(), 
keyRangeRhsExpression,
+                                keyRangeRhsValues);
+                    long endTime = System.currentTimeMillis();
+                    boolean isSet = parent.firstJobEndTime.compareAndSet(0, 
endTime);
+                    if (!isSet && (endTime
+                            - parent.firstJobEndTime.get()) > 
parent.maxServerCacheTimeToLive) {
+                        LOG.warn(addCustomAnnotations(
+                            "Hash plan [" + index
+                                    + "] execution seems too slow. Earlier 
hash cache(s) might have expired on servers.",
+                            parent.delegate.getContext().getConnection()));
+                    }
+                } finally {
+                    iterator.close();
                 }
             } else {
-                assert(keyRangeRhsExpression != null);
+                assert (keyRangeRhsExpression != null);
                 ResultIterator iterator = plan.iterator();
-                for (Tuple result = iterator.next(); result != null; result = 
iterator.next()) {
-                    // Evaluate key expressions for hash join key range 
optimization.
-                    
keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression,
 result, plan.getContext().getTempPtr()));
+                try {
+                    for (Tuple result = iterator.next(); result != null; 
result = iterator.next()) {
+                        // Evaluate key expressions for hash join key range 
optimization.
+                        
keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(
+                            keyRangeRhsExpression, result, 
plan.getContext().getTempPtr()));
+                    }
+                } finally {
+                    iterator.close();
                 }
-                iterator.close();
             }
             if (keyRangeRhsValues != null) {
                 
parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression,
 keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), 
plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2c22be8b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 4b63c50..568094a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -239,6 +239,26 @@ public class SortMergeJoinPlan implements QueryPlan {
         return false;
     }
     
+    private static SQLException closeIterators(ResultIterator lhsIterator, 
ResultIterator rhsIterator) {
+        SQLException e = null;
+        try {
+            lhsIterator.close();
+        } catch (Throwable e1) {
+            e = e1 instanceof SQLException ? (SQLException)e1 : new 
SQLException(e1);
+        }
+        try {
+            rhsIterator.close();
+        } catch (Throwable e2) {
+            SQLException e22 = e2 instanceof SQLException ? (SQLException)e2 : 
new SQLException(e2);
+            if (e != null) {
+                e.setNextException(e22);
+            } else {
+                e = e22;
+            }
+        }
+        return e;
+    }
+
     private class BasicJoinIterator implements ResultIterator {
         private final ResultIterator lhsIterator;
         private final ResultIterator rhsIterator;
@@ -283,9 +303,11 @@ public class SortMergeJoinPlan implements QueryPlan {
         
         @Override
         public void close() throws SQLException {
-            lhsIterator.close();
-            rhsIterator.close();
+            SQLException e = closeIterators(lhsIterator, rhsIterator);
             queue.close();
+            if (e != null) {
+                throw e;
+            }
         }
 
         @Override
@@ -453,8 +475,10 @@ public class SortMergeJoinPlan implements QueryPlan {
 
         @Override
         public void close() throws SQLException {
-            lhsIterator.close();
-            rhsIterator.close();
+            SQLException e = closeIterators(lhsIterator, rhsIterator);
+            if (e != null) {
+                throw e;
+            }
         }
 
         @Override

Reply via email to