This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d48e362  PHOENIX-5793 Support parallel init and fast null return for 
SortMergeJoinPlan
d48e362 is described below

commit d48e3624d71427707759928830b42e1bd42cea90
Author: chenglei <cheng...@apache.org>
AuthorDate: Sat Jun 20 16:10:56 2020 +0800

    PHOENIX-5793 Support parallel init and fast null return for 
SortMergeJoinPlan
---
 .../phoenix/end2end/SortMergeJoinMoreIT.java       |  79 ++++++
 .../apache/phoenix/execute/SortMergeJoinPlan.java  | 242 +++++++++++++---
 .../apache/phoenix/execute/SortMergeJoinTest.java  | 304 +++++++++++++++++++++
 3 files changed, 585 insertions(+), 40 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
index e0a0632..e95a5fa 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -978,21 +978,25 @@ public class SortMergeJoinMoreIT extends 
ParallelStatsDisabledIT {
             conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (3,44)");
             conn.commit();
 
+            //test left semi join, rhs is null
             sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a where a.aid in  "+
                     "(select bid from "+tableName2+" where code > 70 limit 2)";
             ResultSet rs=conn.prepareStatement(sql).executeQuery();
             assertTrue(!rs.next());
 
+            //test left semi join,lhs is null
             sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a where a.aid in  "+
                     "(select bid from "+tableName2+" where code > 50 limit 2)";
             rs=conn.prepareStatement(sql).executeQuery();
             assertTrue(!rs.next());
 
+            //test left anti join,lhs is null
             sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a where a.aid not in  "+
                     "(select bid from "+tableName2+" where code > 50 limit 2)";
             rs=conn.prepareStatement(sql).executeQuery();
             assertTrue(!rs.next());
 
+            //test left anti join,rhs is null
             sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a where a.aid not in  "+
                     "(select bid from "+tableName2+" where code > 70 limit 2)";
             rs=conn.prepareStatement(sql).executeQuery();
@@ -1009,4 +1013,79 @@ public class SortMergeJoinMoreIT extends 
ParallelStatsDisabledIT {
             }
         }
     }
+
+    @Test
+    public void testSortMergeFastReturnNullBug5793() throws Exception {
+        Connection conn = null;
+        try {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+
+            String tableName1 = generateUniqueName();
+            String tableName2 = generateUniqueName();
+
+            String sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
+                    "AID INTEGER PRIMARY KEY,"+
+                    "AGE INTEGER"+
+                    ")";
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (1,11)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (2,22)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName1+"(AID,AGE) VALUES (3,33)");
+            conn.commit();
+
+            sql="CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+
+                    "BID INTEGER PRIMARY KEY,"+
+                    "CODE INTEGER"+
+                    ")";
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (1,66)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (2,55)");
+            conn.createStatement().execute("UPSERT INTO 
"+tableName2+"(BID,CODE) VALUES (3,44)");
+            conn.commit();
+
+            //test inner join, rhs is null
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a inner join  "+
+                    "(select bid,code from "+tableName2+" where code > 70 
limit 2) b on a.aid = b.bid";
+            ResultSet rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            //test inner join,lhs is null
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a inner join  "+
+                    "(select bid from "+tableName2+" where code > 50 limit 2) 
b on a.aid = b.bid";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            //test left join,lhs is null
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a left join  "+
+                    "(select bid from "+tableName2+" where code > 50 limit 2) 
b on a.aid = b.bid";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+            //test left join,rhs is null
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age >=11 and age<=33) a left join  "+
+                    "(select bid from "+tableName2+" where code > 70 limit 2) 
b on a.aid = b.bid";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 1);
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 2);
+            assertTrue(rs.next());
+            assertTrue(rs.getInt(1) == 3);
+            assertTrue(!rs.next());
+
+            //test full join, lhs return null and rhs return null.
+            sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age 
from "+tableName1+" where age > 40) a left join  "+
+                    "(select bid from "+tableName2+" where code > 70 limit 2) 
b on a.aid = b.bid";
+            rs=conn.prepareStatement(sql).executeQuery();
+            assertTrue(!rs.next());
+
+        } finally {
+            if(conn!=null) {
+                conn.close();
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 3211701..777ea47 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -28,6 +28,11 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -36,6 +41,7 @@ import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatelessExpressionCompiler;
@@ -73,18 +79,25 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class SortMergeJoinPlan implements QueryPlan {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SortMergeJoinPlan.class);
     private static final byte[] EMPTY_PTR = new byte[0];
     
     private final StatementContext context;
     private final FilterableStatement statement;
     private final TableRef table;
-    private final JoinType type;
+    /**
+     * In {@link QueryCompiler#compileJoinQuery},{@link JoinType#Right} is 
converted
+     * to {@link JoinType#Left}.
+     */
+    private final JoinType joinType;
     private final QueryPlan lhsPlan;
     private final QueryPlan rhsPlan;
     private final List<Expression> lhsKeyExpressions;
@@ -122,7 +135,7 @@ public class SortMergeJoinPlan implements QueryPlan {
         this.context = context;
         this.statement = statement;
         this.table = table;
-        this.type = type;
+        this.joinType = type;
         this.lhsPlan = lhsPlan;
         this.rhsPlan = rhsPlan;
         this.lhsKeyExpressions = lhsAndRhsKeyExpressions.getFirst();
@@ -170,7 +183,7 @@ public class SortMergeJoinPlan implements QueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {        
-        return type == JoinType.Semi || type == JoinType.Anti ? 
+        return joinType == JoinType.Semi || joinType == JoinType.Anti ?
                 new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), 
rhsPlan.iterator(scanGrouper)) :
                 new BasicJoinIterator(lhsPlan.iterator(scanGrouper), 
rhsPlan.iterator(scanGrouper));
     }
@@ -183,7 +196,7 @@ public class SortMergeJoinPlan implements QueryPlan {
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
         List<String> steps = Lists.newArrayList();
-        steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") 
TABLES");
+        steps.add("SORT-MERGE-JOIN (" + joinType.toString().toUpperCase() + ") 
TABLES");
         for (String step : lhsPlan.getExplainPlan().getPlanSteps()) {
             steps.add("    " + step);            
         }
@@ -277,7 +290,7 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     public JoinType getJoinType() {
-        return type;
+        return joinType;
     }
 
     private static SQLException closeIterators(ResultIterator lhsIterator, 
ResultIterator rhsIterator) {
@@ -300,7 +313,31 @@ public class SortMergeJoinPlan implements QueryPlan {
         return e;
     }
 
-    private class BasicJoinIterator implements ResultIterator {
+    /**
+     * close the futures and threadPoolExecutor,ignore exception.
+     * @param threadPoolExecutor
+     * @param futures
+     */
+    private static void clearThreadPoolExecutor(
+            ExecutorService threadPoolExecutor,
+            List<Future<Boolean>> futures) {
+        for(Future<?> future : futures) {
+            try {
+                future.cancel(true);
+            } catch(Throwable ignore) {
+                LOGGER.error("cancel future error", ignore);
+            }
+        }
+
+        try {
+            threadPoolExecutor.shutdownNow();
+        } catch(Throwable ignore) {
+            LOGGER.error("shutdownNow threadPoolExecutor error", ignore);
+        }
+    }
+
+    @VisibleForTesting
+    public class BasicJoinIterator implements ResultIterator {
         private final ResultIterator lhsIterator;
         private final ResultIterator rhsIterator;
         private boolean initialized;
@@ -318,7 +355,8 @@ public class SortMergeJoinPlan implements QueryPlan {
         private byte[] emptyProjectedValue;
         private SizeAwareQueue<Tuple> queue;
         private Iterator<Tuple> queueIterator;
-        
+        private boolean joinResultNullBecauseOneSideNull = false;
+
         public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator 
rhsIterator) {
             this.lhsIterator = lhsIterator;
             this.rhsIterator = rhsIterator;
@@ -341,22 +379,30 @@ public class SortMergeJoinPlan implements QueryPlan {
             this.queue = PhoenixQueues.newTupleQueue(spoolingEnabled, 
thresholdBytes);
             this.queueIterator = null;
         }
-        
+
+        public boolean isJoinResultNullBecauseOneSideNull() {
+            return this.joinResultNullBecauseOneSideNull;
+        }
+
+        public boolean isInitialized() {
+            return this.initialized;
+        }
+
         @Override
         public void close() throws SQLException {
-            SQLException e = closeIterators(lhsIterator, rhsIterator);
+            SQLException sqlException = closeIterators(lhsIterator, 
rhsIterator);
             try {
               queue.close();
             } catch (IOException t) {
-              if (e != null) {
-                    e.setNextException(
+              if (sqlException != null) {
+                    sqlException.setNextException(
                         new SQLException("Also encountered exception while 
closing queue", t));
               } else {
-                e = new SQLException("Error while closing queue",t);
+                sqlException = new SQLException("Error while closing queue",t);
               }
             }
-            if (e != null) {
-                throw e;
+            if (sqlException != null) {
+                LOGGER.error("BasicJoinIterator close error!", sqlException);
             }
         }
 
@@ -366,6 +412,10 @@ public class SortMergeJoinPlan implements QueryPlan {
                 init();
             }
 
+            if(this.joinResultNullBecauseOneSideNull) {
+                return null;
+            }
+
             Tuple next = null;
             while (next == null && !isEnd()) {
                 if (queueIterator != null) {
@@ -404,12 +454,12 @@ public class SortMergeJoinPlan implements QueryPlan {
                             }
                             advance(false);
                         } else if (lhsKey.compareTo(rhsKey) < 0) {
-                            if (type == JoinType.Full || type == 
JoinType.Left) {
+                            if (joinType == JoinType.Full || joinType == 
JoinType.Left) {
                                 next = join(lhsTuple, null);
                             }
                             advance(true);
                         } else {
-                            if (type == JoinType.Full) {
+                            if (joinType == JoinType.Full) {
                                 next = join(null, rhsTuple);
                             }
                             advance(false);
@@ -430,21 +480,72 @@ public class SortMergeJoinPlan implements QueryPlan {
         @Override
         public void explain(List<String> planSteps) {
         }
-        
-        private void init() throws SQLException {
-            nextLhsTuple = lhsIterator.next();
-            if (nextLhsTuple != null) {
-                nextLhsKey.evaluate(nextLhsTuple);
+
+        private void doInit(boolean lhs) throws SQLException {
+            if(lhs) {
+                nextLhsTuple = lhsIterator.next();
+                if (nextLhsTuple != null) {
+                    nextLhsKey.evaluate(nextLhsTuple);
+                }
+                advance(true);
+            } else {
+                nextRhsTuple = rhsIterator.next();
+                if (nextRhsTuple != null) {
+                    nextRhsKey.evaluate(nextRhsTuple);
+                }
+                advance(false);
             }
-            advance(true);
-            nextRhsTuple = rhsIterator.next();
-            if (nextRhsTuple != null) {
-                nextRhsKey.evaluate(nextRhsTuple);
+        }
+
+        /**
+         * Parallel init, when:
+         * 1. {@link #lhsTuple} is null for inner join or left join.
+         * 2. {@link #rhsTuple} is null for inner join.
+         * we could conclude that the join result is null early, set {@link 
#joinResultNullBecauseOneSideNull} true.
+         * @throws SQLException
+         */
+        private void init() throws SQLException {
+            ExecutorService threadPoolExecutor = 
Executors.newFixedThreadPool(2);
+            ExecutorCompletionService<Boolean> executorCompletionService =
+                    new ExecutorCompletionService<Boolean>(threadPoolExecutor);
+            List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(2);
+            futures.add(executorCompletionService.submit(new 
Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    doInit(true);
+                    return lhsTuple == null &&
+                            ((joinType == JoinType.Inner) || (joinType == 
JoinType.Left));
+                }
+            }));
+
+            futures.add(executorCompletionService.submit(new 
Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    doInit(false);
+                    return rhsTuple == null && joinType == JoinType.Inner;
+                }
+            }));
+
+            try {
+                Future<Boolean> future = executorCompletionService.take();
+                if(future.get()) {
+                    this.joinResultNullBecauseOneSideNull = true;
+                    this.initialized = true;
+                    return;
+                }
+
+                future = executorCompletionService.take();
+                if(future.get()) {
+                    this.joinResultNullBecauseOneSideNull = true;
+                }
+                initialized = true;
+            } catch (Throwable throwable) {
+                throw new SQLException("failed in init join iterators", 
throwable);
+            } finally {
+                clearThreadPoolExecutor(threadPoolExecutor, futures);
             }
-            advance(false);
-            initialized = true;
         }
-        
+
         private void advance(boolean lhs) throws SQLException {
             if (lhs) {
                 lhsTuple = nextLhsTuple;
@@ -472,8 +573,8 @@ public class SortMergeJoinPlan implements QueryPlan {
         }
         
         private boolean isEnd() {
-            return (lhsTuple == null && (rhsTuple == null || type != 
JoinType.Full))
-                    || (queueIterator == null && rhsTuple == null && type == 
JoinType.Inner);
+            return (lhsTuple == null && (rhsTuple == null || joinType != 
JoinType.Full))
+                    || (queueIterator == null && rhsTuple == null && joinType 
== JoinType.Inner);
         }        
         
         private Tuple join(Tuple lhs, Tuple rhs) throws SQLException {
@@ -514,12 +615,15 @@ public class SortMergeJoinPlan implements QueryPlan {
         private Tuple rhsTuple;
         private JoinKey lhsKey;
         private JoinKey rhsKey;
-        
+        private boolean joinResultNullBecauseOneSideNull = false;
+
         public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator 
rhsIterator) {
-            if (type != JoinType.Semi && type != JoinType.Anti) throw new 
IllegalArgumentException("Type " + type + " is not allowed by " + 
SemiAntiJoinIterator.class.getName());
+            if (joinType != JoinType.Semi && joinType != JoinType.Anti) {
+                throw new IllegalArgumentException("Type " + joinType + " is 
not allowed by " + SemiAntiJoinIterator.class.getName());
+            }
             this.lhsIterator = lhsIterator;
             this.rhsIterator = rhsIterator;
-            this.isSemi = type == JoinType.Semi;
+            this.isSemi = joinType == JoinType.Semi;
             this.initialized = false;
             this.lhsTuple = null;
             this.rhsTuple = null;
@@ -527,22 +631,80 @@ public class SortMergeJoinPlan implements QueryPlan {
             this.rhsKey = new JoinKey(rhsKeyExpressions);
         }
 
+        public boolean isJoinResultNullBecauseOneSideNull() {
+            return this.joinResultNullBecauseOneSideNull;
+        }
+
+        public boolean isInitialized() {
+            return this.initialized;
+        }
+
         @Override
         public void close() throws SQLException {
-            SQLException e = closeIterators(lhsIterator, rhsIterator);
-            if (e != null) {
-                throw e;
+            SQLException sqlException = closeIterators(lhsIterator, 
rhsIterator);
+            if (sqlException != null) {
+                LOGGER.error("SemiAntiJoinIterator close error!", 
sqlException);
+            }
+        }
+
+        /**
+         * Parallel init, when:
+         * 1. {@link #lhsTuple} is null.
+         * 2. {@link #rhsTuple} is null for left semi join.
+         * we could conclude that the join result is null early, set {@link 
#joinResultNullBecauseOneSideNull} true.
+         * @throws SQLException
+         */
+        private void init() throws SQLException {
+            ExecutorService threadPoolExecutor = 
Executors.newFixedThreadPool(2);
+            ExecutorCompletionService<Boolean> executorCompletionService =
+                    new ExecutorCompletionService<Boolean>(threadPoolExecutor);
+            List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(2);
+            futures.add(executorCompletionService.submit(new 
Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    advance(true);
+                    return lhsTuple == null;
+                }
+            }));
+
+            futures.add(executorCompletionService.submit(new 
Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    advance(false);
+                    return (rhsTuple == null && isSemi);
+                }
+            }));
+
+            try {
+                Future<Boolean> future = executorCompletionService.take();
+                if(future.get()) {
+                    this.joinResultNullBecauseOneSideNull = true;
+                    this.initialized = true;
+                    return;
+                }
+
+                future = executorCompletionService.take();
+                if(future.get()) {
+                    this.joinResultNullBecauseOneSideNull = true;
+                }
+                initialized = true;
+            } catch (Throwable throwable) {
+                throw new SQLException("failed in init join iterators", 
throwable);
+            } finally {
+                clearThreadPoolExecutor(threadPoolExecutor, futures);
             }
         }
 
         @Override
         public Tuple next() throws SQLException {
             if (!initialized) {
-                advance(true);
-                advance(false);
-                initialized = true;
+                init();
             }
-            
+
+            if(this.joinResultNullBecauseOneSideNull) {
+                return null;
+            }
+
             Tuple next = null;            
             while (next == null && !isEnd()) {
                 if (rhsTuple != null) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java
index 08a98e1..4a76bc5 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java
@@ -36,6 +36,9 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import static org.junit.Assert.assertTrue;
 
 
@@ -164,4 +167,305 @@ public class SortMergeJoinTest {
         assertTrue(resultTuple == null);
         assertTrue(semiAntiJoinIterator.isEnd());
     }
+
+    private final long INIT_LATENCY = 10 * 1000L;
+
+    @Test
+    public void testSortMergeFastReturnNullBug5793() throws SQLException, 
InterruptedException {
+        // mock for SortMergeJoinPlan
+        StatementContext statementContext = 
Mockito.mock(StatementContext.class);
+        PhoenixConnection phoenixConnection = 
Mockito.mock(PhoenixConnection.class);
+        when(statementContext.getConnection()).thenReturn(phoenixConnection);
+        ConnectionQueryServices connectionQueryServices = 
Mockito.mock(ConnectionQueryServices.class);
+        
when(connectionQueryServices.getProps()).thenReturn(ReadOnlyProps.EMPTY_PROPS);
+        
when(phoenixConnection.getQueryServices()).thenReturn(connectionQueryServices);
+
+        List<Expression> expressions = new ArrayList<Expression>();
+        Pair<List<Expression>,List<Expression>> lhsAndRhsJoinExpressions = 
Pair.newPair(expressions, expressions);
+        Pair<List<OrderByNode>, List<OrderByNode>> lhsAndRhsOrderByNodes = 
Pair.<List<OrderByNode>, List<OrderByNode>> newPair(
+                new ArrayList<OrderByNode>(),
+                new ArrayList<OrderByNode>());
+
+        //test inner join, lhs long latency and rhs return null.
+        JoinTableNode.JoinType joinType = JoinTableNode.JoinType.Inner;
+        ResultIterator lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenAnswer(longLatencyInit());
+        QueryPlan lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        QueryPlan rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        ResultIterator rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenReturn(null);
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        SortMergeJoinPlan sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        SortMergeJoinPlan.BasicJoinIterator sortMergeJoinResultIterator =
+                
(SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator();
+
+        long startTime = System.currentTimeMillis();
+        Tuple resultTuple = sortMergeJoinResultIterator.next();
+        long elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+
+        //test inner join, lhs return null and rhs long latency.
+        joinType = JoinTableNode.JoinType.Inner;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenAnswer(longLatencyInit());
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        sortMergeJoinResultIterator =
+                
(SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+        //test left join, lhs return null and rhs long latency.
+        joinType = JoinTableNode.JoinType.Left;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenAnswer(longLatencyInit());
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        sortMergeJoinResultIterator = 
(SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+        //test full join, lhs return null and rhs return null.
+        joinType = JoinTableNode.JoinType.Full;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenReturn(null);
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        sortMergeJoinResultIterator = 
(SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(!sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+        //test left semi join, lhs return null and rhs long latency.
+        joinType = JoinTableNode.JoinType.Semi;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenAnswer(longLatencyInit());
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        SortMergeJoinPlan.SemiAntiJoinIterator 
sortMergeJoinSemiAntiResultIterator =
+                
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinSemiAntiResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+        //test left semi join, lhs long latency and rhs return null.
+        joinType = JoinTableNode.JoinType.Semi;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenAnswer(longLatencyInit());
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenReturn(null);
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        sortMergeJoinSemiAntiResultIterator = 
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinSemiAntiResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+
+        //test left semi join, lhs return null and rhs long latency.
+        joinType = JoinTableNode.JoinType.Anti;
+        lhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(lhsResultIterator.next()).thenReturn(null);
+        lhsQueryPlan = Mockito.mock(QueryPlan.class);
+        
when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator);
+
+        rhsQueryPlan = Mockito.mock(QueryPlan.class);
+        rhsResultIterator = Mockito.mock(ResultIterator.class);
+        when(rhsResultIterator.next()).thenAnswer(longLatencyInit());
+        
when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator);
+
+        sortMergeJoinPlan = new SortMergeJoinPlan(
+                statementContext,
+                null,
+                null,
+                joinType,
+                lhsQueryPlan,
+                rhsQueryPlan,
+                lhsAndRhsJoinExpressions,
+                expressions,
+                null,
+                null,
+                null,
+                0,
+                true,
+                lhsAndRhsOrderByNodes);
+        sortMergeJoinSemiAntiResultIterator =
+                
(SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator();
+
+        startTime = System.currentTimeMillis();
+        resultTuple = sortMergeJoinSemiAntiResultIterator.next();
+        elapsed = System.currentTimeMillis() - startTime;
+
+        assertTrue(resultTuple == null);
+        
assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull());
+        assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized());
+        assertTrue(elapsed < INIT_LATENCY);
+    }
+
+    private Answer<Tuple> longLatencyInit() {
+        return new Answer<Tuple>() {
+            @Override
+            public Tuple answer(InvocationOnMock invocation) throws Throwable {
+                Thread.sleep(INIT_LATENCY);
+                Tuple tuple = Mockito.mock(Tuple.class);
+                return tuple;
+            }
+        };
+    }
 }

Reply via email to