This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 11c2492 PHOENIX-5956 Optimize LeftSemiJoin For SortMergeJoin 11c2492 is described below commit 11c24925ec29f91970c36322cf3d7a00a32efc40 Author: chenglei <cheng...@apache.org> AuthorDate: Thu Jun 18 18:10:24 2020 +0800 PHOENIX-5956 Optimize LeftSemiJoin For SortMergeJoin --- .../phoenix/end2end/SortMergeJoinMoreIT.java | 64 ++++++++ .../apache/phoenix/execute/SortMergeJoinPlan.java | 18 ++- .../apache/phoenix/execute/SortMergeJoinTest.java | 167 +++++++++++++++++++++ 3 files changed, 246 insertions(+), 3 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java index bbaa68a..e0a0632 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java @@ -945,4 +945,68 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT { } } } + + @Test + public void testOptimizeLeftSemiJoinForSortMergeJoinBug5956() throws Exception { + Connection conn = null; + try { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + + String tableName1 = generateUniqueName(); + String tableName2 = generateUniqueName(); + + String sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+ + "AID INTEGER PRIMARY KEY,"+ + "AGE INTEGER"+ + ")"; + conn.createStatement().execute(sql); + + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (1,11)"); + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (2,22)"); + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (3,33)"); + conn.commit(); + + sql="CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+ + "BID INTEGER PRIMARY KEY,"+ + "CODE INTEGER"+ + ")"; + conn.createStatement().execute(sql); + + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (1,66)"); + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (2,55)"); + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (3,44)"); + conn.commit(); + + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a where a.aid in "+ + "(select bid from "+tableName2+" where code > 70 limit 2)"; + ResultSet rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a where a.aid in "+ + "(select bid from "+tableName2+" where code > 50 limit 2)"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a where a.aid not in "+ + "(select bid from "+tableName2+" where code > 50 limit 2)"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a where a.aid not in "+ + "(select bid from "+tableName2+" where code > 70 limit 2)"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 1); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 2); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 3); + assertTrue(!rs.next()); + } finally { + if(conn!=null) { + conn.close(); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 42a18b0..3211701 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -74,6 +74,7 @@ import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.SchemaUtil; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -502,8 +503,9 @@ public class SortMergeJoinPlan implements QueryPlan { } } } - - private class SemiAntiJoinIterator implements ResultIterator { + + @VisibleForTesting + public class SemiAntiJoinIterator implements ResultIterator { private final ResultIterator lhsIterator; private final ResultIterator rhsIterator; private final boolean isSemi; @@ -542,7 +544,7 @@ public class SortMergeJoinPlan implements QueryPlan { } Tuple next = null; - while (lhsTuple != null && next == null) { + while (next == null && !isEnd()) { if (rhsTuple != null) { if (lhsKey.equals(rhsKey)) { if (isSemi) { @@ -568,6 +570,16 @@ public class SortMergeJoinPlan implements QueryPlan { return next; } + /** + * Check if the {@link #next} could exit early when the {@link #lhsTuple} + * or {@link #rhsTuple} is null. + */ + @VisibleForTesting + public boolean isEnd() { + return (this.lhsTuple == null) || + (this.rhsTuple == null && this.isSemi); + } + @Override public void explain(List<String> planSteps) { } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java new file mode 100644 index 0000000..08a98e1 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import static org.mockito.Mockito.when; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.iterate.DefaultParallelScanGrouper; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.parse.JoinTableNode; +import org.apache.phoenix.parse.OrderByNode; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Test; +import org.mockito.Mockito; +import static org.junit.Assert.assertTrue; + + +public class SortMergeJoinTest { + + + @Test + public void testOptimizeSemiJoinForSortMergeJoinBug5956() throws SQLException, InterruptedException { + // mock for SortMergeJoinPlan + StatementContext statementContext = Mockito.mock(StatementContext.class); + PhoenixConnection phoenixConnection = Mockito.mock(PhoenixConnection.class); + when(statementContext.getConnection()).thenReturn(phoenixConnection); + ConnectionQueryServices connectionQueryServices = Mockito.mock(ConnectionQueryServices.class); + when(connectionQueryServices.getProps()).thenReturn(ReadOnlyProps.EMPTY_PROPS); + when(phoenixConnection.getQueryServices()).thenReturn(connectionQueryServices); + + List<Expression> expressions = new ArrayList<Expression>(); + Pair<List<Expression>,List<Expression>> lhsAndRhsJoinExpressions = Pair.newPair(expressions, expressions); + Pair<List<OrderByNode>, List<OrderByNode>> lhsAndRhsOrderByNodes = Pair.<List<OrderByNode>, List<OrderByNode>> newPair( + new ArrayList<OrderByNode>(), + new ArrayList<OrderByNode>()); + + //test semi join rhs is null + JoinTableNode.JoinType joinType = JoinTableNode.JoinType.Semi; + ResultIterator lhsResultIterator = Mockito.mock(ResultIterator.class); + Tuple tuple = Mockito.mock(Tuple.class); + when(lhsResultIterator.next()).thenReturn(tuple); + QueryPlan lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + QueryPlan rhsQueryPlan = Mockito.mock(QueryPlan.class); + ResultIterator rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenReturn(null); + when(rhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + SortMergeJoinPlan sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + SortMergeJoinPlan.SemiAntiJoinIterator semiAntiJoinIterator = + (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + Tuple resultTuple = semiAntiJoinIterator.next(); + assertTrue(resultTuple == null); + assertTrue(semiAntiJoinIterator.isEnd()); + + //test semi join lhs is null + joinType = JoinTableNode.JoinType.Semi; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + tuple = Mockito.mock(Tuple.class); + when(rhsResultIterator.next()).thenReturn(tuple); + when(rhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + semiAntiJoinIterator = (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + resultTuple = semiAntiJoinIterator.next(); + assertTrue(resultTuple == null); + assertTrue(semiAntiJoinIterator.isEnd()); + + //test anti join lhs is null + joinType = JoinTableNode.JoinType.Anti; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + tuple = Mockito.mock(Tuple.class); + when(rhsResultIterator.next()).thenReturn(tuple); + when(rhsQueryPlan.iterator( + DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + semiAntiJoinIterator = (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + resultTuple = semiAntiJoinIterator.next(); + assertTrue(resultTuple == null); + assertTrue(semiAntiJoinIterator.isEnd()); + } +}