This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 11c2492  PHOENIX-5956 Optimize LeftSemiJoin For SortMergeJoin
11c2492 is described below

commit 11c24925ec29f91970c36322cf3d7a00a32efc40
Author: chenglei <cheng...@apache.org>
AuthorDate: Thu Jun 18 18:10:24 2020 +0800

    PHOENIX-5956 Optimize LeftSemiJoin For SortMergeJoin
---
 .../phoenix/end2end/SortMergeJoinMoreIT.java       |  64 ++++++++
 .../apache/phoenix/execute/SortMergeJoinPlan.java  |  18 ++-
 .../apache/phoenix/execute/SortMergeJoinTest.java  | 167 +++++++++++++++++++++
 3 files changed, 246 insertions(+), 3 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
index bbaa68a..e0a0632 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -945,4 +945,68 @@ public class SortMergeJoinMoreIT extends 
ParallelStatsDisabledIT {
             }
         }
     }
+
+    @Test
+    public void testOptimizeLeftSemiJoinForSortMergeJoinBug5956() throws 
Exception {
+        Connection conn = null;
+        try {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+
+            String tableName1 = generateUniqueName();
+            String tableName2 = generateUniqueName();
+
+            String sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
+                    "AID INTEGER PRIMARY KEY,"+
+                    "AGE INTEGER"+
+                    ")";
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (1,11)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (2,22)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (3,33)");
+            conn.commit();
+
+            sql="CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+
+                    "BID INTEGER PRIMARY KEY,"+
+                    "CODE INTEGER"+
+                    ")";
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (1,66)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (2,55)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (3,44)");
+            conn.commit();
+
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a where a.aid in  "+
+                    "(select bid from "+tableName2+" where code > 70 limit 2)";
+            ResultSet rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a where a.aid in  "+
+                    "(select bid from "+tableName2+" where code > 50 limit 2)";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a where a.aid not in  "+
+                    "(select bid from "+tableName2+" where code > 50 limit 2)";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a where a.aid not in  "+
+                    "(select bid from "+tableName2+" where code > 70 limit 2)";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 1);
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 2);
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 3);
+            assertTrue(!rs.next());
+        } finally {
+            if(conn!=null) {
+                conn.close();
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 42a18b0..3211701 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -74,6 +74,7 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SchemaUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -502,8 +503,9 @@ public class SortMergeJoinPlan implements QueryPlan {
             }
         }
     }
-    
-    private class SemiAntiJoinIterator implements ResultIterator {
+
+    @VisibleForTesting
+    public class SemiAntiJoinIterator implements ResultIterator {
         private final ResultIterator lhsIterator;
         private final ResultIterator rhsIterator;
         private final boolean isSemi;
@@ -542,7 +544,7 @@ public class SortMergeJoinPlan implements QueryPlan {
             }
             
             Tuple next = null;            
-            while (lhsTuple != null && next == null) {
+            while (next == null && !isEnd()) {
                 if (rhsTuple != null) {
                     if (lhsKey.equals(rhsKey)) {
                         if (isSemi) {
@@ -568,6 +570,16 @@ public class SortMergeJoinPlan implements QueryPlan {
             return next;
         }
 
+        /**
+         * Check if the {@link #next} could exit early when the {@link 
#lhsTuple}
+         * or {@link #rhsTuple} is null.
+         */
+        @VisibleForTesting
+        public boolean isEnd() {
+            return (this.lhsTuple == null) ||
+                   (this.rhsTuple == null && this.isSemi);
+        }
+
         @Override
         public void explain(List<String> planSteps) {
         }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java
new file mode 100644
index 0000000..08a98e1
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.JoinTableNode;
+import org.apache.phoenix.parse.OrderByNode;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Test;
+import org.mockito.Mockito;
+import static org.junit.Assert.assertTrue;
+
+
+public class SortMergeJoinTest {
+
+
+    @Test
+    public void testOptimizeSemiJoinForSortMergeJoinBug5956() throws 
SQLException, InterruptedException {
+        // mock for SortMergeJoinPlan
+        StatementContext statementContext = 
Mockito.mock(StatementContext.class);
+        PhoenixConnection phoenixConnection = 
Mockito.mock(PhoenixConnection.class);
+        when(statementContext.getConnection()).thenReturn(phoenixConnection);
+        ConnectionQueryServices connectionQueryServices = 
Mockito.mock(ConnectionQueryServices.class);
+        
when(connectionQueryServices.getProps()).thenReturn(ReadOnlyProps.EMPTY_PROPS);
+        
when(phoenixConnection.getQueryServices()).thenReturn(connectionQueryServices);
+
+        List<Expression> expressions = new ArrayList<Expression>();
+        Pair<List<Expression>,List<Expression>> lhsAndRhsJoinExpressions = 
Pair.newPair(expressions, expressions);
+        Pair<List<OrderByNode>, List<OrderByNode>> lhsAndRhsOrderByNodes = 
Pair.<List<OrderByNode>, List<OrderByNode>> newPair(
+                new ArrayList<OrderByNode>(),
+                new ArrayList<OrderByNode>());
+
+        //test semi join rhs is null
+        JoinTableNode.JoinType joinType = JoinTableNode.JoinType.Semi;
+        ResultIterator lhsResultIterator = Mockito.mock(ResultIterator.class);
+        Tuple tuple = Mockito.mock(Tuple.class);
+        when(lhsResultIterator.next()).thenReturn(tuple);
+        QueryPlan lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        when(lhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        QueryPlan rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        ResultIterator rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenReturn(null);
+        when(rhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        SortMergeJoinPlan sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        SortMergeJoinPlan.SemiAntiJoinIterator semiAntiJoinIterator =
+                
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+        Tuple resultTuple = semiAntiJoinIterator.next();
+        assertTrue(resultTuple == null);
+        assertTrue(semiAntiJoinIterator.isEnd());
+
+        //test semi join lhs is null
+        joinType = JoinTableNode.JoinType.Semi;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        when(lhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        tuple = Mockito.mock(Tuple.class);
+        when(rhsResultIterator.next()).thenReturn(tuple);
+        when(rhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        semiAntiJoinIterator = 
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+        resultTuple = semiAntiJoinIterator.next();
+        assertTrue(resultTuple == null);
+        assertTrue(semiAntiJoinIterator.isEnd());
+
+        //test anti join lhs is null
+        joinType = JoinTableNode.JoinType.Anti;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        when(lhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        tuple = Mockito.mock(Tuple.class);
+        when(rhsResultIterator.next()).thenReturn(tuple);
+        when(rhsQueryPlan.iterator(
+                
DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        semiAntiJoinIterator = 
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+        resultTuple = semiAntiJoinIterator.next();
+        assertTrue(resultTuple == null);
+        assertTrue(semiAntiJoinIterator.isEnd());
+    }
+}

Reply via email to