Repository: phoenix Updated Branches: refs/heads/4.11-HBase-0.98 21fb47495 -> 75e909ec9
PHOENIX-3981 Ensure iterators are closed for join plans Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/75e909ec Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/75e909ec Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/75e909ec Branch: refs/heads/4.11-HBase-0.98 Commit: 75e909ec9599ff5cce3c9f2cd09f26e1a379152d Parents: 21fb474 Author: Samarth Jain <[email protected]> Authored: Wed Jun 28 13:33:06 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Wed Jun 28 13:33:06 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/execute/HashJoinPlan.java | 102 +++++++++++-------- .../phoenix/execute/SortMergeJoinPlan.java | 32 +++++- 2 files changed, 89 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/75e909ec/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 64e2ce2..17c3cca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -293,39 +293,43 @@ public class HashJoinPlan extends DelegateQueryPlan { public ServerCache execute(HashJoinPlan parent) throws SQLException { List<Object> values = Lists.<Object> newArrayList(); ResultIterator iterator = plan.iterator(); - RowProjector projector = plan.getProjector(); - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - int columnCount = projector.getColumnCount(); - int rowCount = 0; - PDataType baseType = PVarbinary.INSTANCE; - for (Tuple tuple = iterator.next(); tuple != null; tuple = iterator.next()) { - if (expectSingleRow && rowCount >= 1) - throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); - - if (columnCount == 1) { - ColumnProjector columnProjector = projector.getColumnProjector(0); - baseType = columnProjector.getExpression().getDataType(); - Object value = columnProjector.getValue(tuple, baseType, ptr); - values.add(value); - } else { - List<Expression> expressions = Lists.<Expression>newArrayListWithExpectedSize(columnCount); - for (int i = 0; i < columnCount; i++) { - ColumnProjector columnProjector = projector.getColumnProjector(i); - PDataType type = columnProjector.getExpression().getDataType(); - Object value = columnProjector.getValue(tuple, type, ptr); - expressions.add(LiteralExpression.newConstant(value, type)); + try { + RowProjector projector = plan.getProjector(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + int columnCount = projector.getColumnCount(); + int rowCount = 0; + PDataType baseType = PVarbinary.INSTANCE; + for (Tuple tuple = iterator.next(); tuple != null; tuple = iterator.next()) { + if (expectSingleRow && rowCount >= 1) + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); + + if (columnCount == 1) { + ColumnProjector columnProjector = projector.getColumnProjector(0); + baseType = columnProjector.getExpression().getDataType(); + Object value = columnProjector.getValue(tuple, baseType, ptr); + values.add(value); + } else { + List<Expression> expressions = Lists.<Expression>newArrayListWithExpectedSize(columnCount); + for (int i = 0; i < columnCount; i++) { + ColumnProjector columnProjector = projector.getColumnProjector(i); + PDataType type = columnProjector.getExpression().getDataType(); + Object value = columnProjector.getValue(tuple, type, ptr); + expressions.add(LiteralExpression.newConstant(value, type)); + } + Expression expression = new RowValueConstructorExpression(expressions, true); + baseType = expression.getDataType(); + expression.evaluate(null, ptr); + values.add(baseType.toObject(ptr)); } - Expression expression = new RowValueConstructorExpression(expressions, true); - baseType = expression.getDataType(); - expression.evaluate(null, ptr); - values.add(baseType.toObject(ptr)); + rowCount++; } - rowCount++; + + Object result = expectSingleRow ? (values.isEmpty() ? null : values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, values.toArray()); + parent.getContext().setSubqueryResult(select, result); + return null; + } finally { + iterator.close(); } - - Object result = expectSingleRow ? (values.isEmpty() ? null : values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, values.toArray()); - parent.getContext().setSubqueryResult(select, result); - return null; } @Override @@ -383,21 +387,37 @@ public class HashJoinPlan extends DelegateQueryPlan { } ServerCache cache = null; if (hashExpressions != null) { - cache = parent.hashClient.addHashCache(ranges, plan.iterator(), - plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.delegate.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues); - long endTime = System.currentTimeMillis(); - boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); - if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { - LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.delegate.getContext().getConnection())); + ResultIterator iterator = plan.iterator(); + try { + cache = + parent.hashClient.addHashCache(ranges, iterator, + plan.getEstimatedSize(), hashExpressions, singleValueOnly, + parent.delegate.getTableRef(), keyRangeRhsExpression, + keyRangeRhsValues); + long endTime = System.currentTimeMillis(); + boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); + if (!isSet && (endTime + - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { + LOG.warn(addCustomAnnotations( + "Hash plan [" + index + + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", + parent.delegate.getContext().getConnection())); + } + } finally { + iterator.close(); } } else { - assert(keyRangeRhsExpression != null); + assert (keyRangeRhsExpression != null); ResultIterator iterator = plan.iterator(); - for (Tuple result = iterator.next(); result != null; result = iterator.next()) { - // Evaluate key expressions for hash join key range optimization. - keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression, result, plan.getContext().getTempPtr())); + try { + for (Tuple result = iterator.next(); result != null; result = iterator.next()) { + // Evaluate key expressions for hash join key range optimization. + keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression( + keyRangeRhsExpression, result, plan.getContext().getTempPtr())); + } + } finally { + iterator.close(); } - iterator.close(); } if (keyRangeRhsValues != null) { parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression, keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/75e909ec/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 4b63c50..568094a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -239,6 +239,26 @@ public class SortMergeJoinPlan implements QueryPlan { return false; } + private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) { + SQLException e = null; + try { + lhsIterator.close(); + } catch (Throwable e1) { + e = e1 instanceof SQLException ? (SQLException)e1 : new SQLException(e1); + } + try { + rhsIterator.close(); + } catch (Throwable e2) { + SQLException e22 = e2 instanceof SQLException ? (SQLException)e2 : new SQLException(e2); + if (e != null) { + e.setNextException(e22); + } else { + e = e22; + } + } + return e; + } + private class BasicJoinIterator implements ResultIterator { private final ResultIterator lhsIterator; private final ResultIterator rhsIterator; @@ -283,9 +303,11 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public void close() throws SQLException { - lhsIterator.close(); - rhsIterator.close(); + SQLException e = closeIterators(lhsIterator, rhsIterator); queue.close(); + if (e != null) { + throw e; + } } @Override @@ -453,8 +475,10 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public void close() throws SQLException { - lhsIterator.close(); - rhsIterator.close(); + SQLException e = closeIterators(lhsIterator, rhsIterator); + if (e != null) { + throw e; + } } @Override
