This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new d48e362 PHOENIX-5793 Support parallel init and fast null return for SortMergeJoinPlan d48e362 is described below commit d48e3624d71427707759928830b42e1bd42cea90 Author: chenglei <cheng...@apache.org> AuthorDate: Sat Jun 20 16:10:56 2020 +0800 PHOENIX-5793 Support parallel init and fast null return for SortMergeJoinPlan --- .../phoenix/end2end/SortMergeJoinMoreIT.java | 79 ++++++ .../apache/phoenix/execute/SortMergeJoinPlan.java | 242 +++++++++++++--- .../apache/phoenix/execute/SortMergeJoinTest.java | 304 +++++++++++++++++++++ 3 files changed, 585 insertions(+), 40 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java index e0a0632..e95a5fa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java @@ -978,21 +978,25 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT { conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (3,44)"); conn.commit(); + //test left semi join, rhs is null sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a where a.aid in "+ "(select bid from "+tableName2+" where code > 70 limit 2)"; ResultSet rs=conn.prepareStatement(sql).executeQuery(); assertTrue(!rs.next()); + //test left semi join,lhs is null sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a where a.aid in "+ "(select bid from "+tableName2+" where code > 50 limit 2)"; rs=conn.prepareStatement(sql).executeQuery(); assertTrue(!rs.next()); + //test left anti join,lhs is null sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a where a.aid not in "+ "(select bid from "+tableName2+" where code > 50 limit 2)"; rs=conn.prepareStatement(sql).executeQuery(); assertTrue(!rs.next()); + //test left anti join,rhs is null sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a where a.aid not in "+ "(select bid from "+tableName2+" where code > 70 limit 2)"; rs=conn.prepareStatement(sql).executeQuery(); @@ -1009,4 +1013,79 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT { } } } + + @Test + public void testSortMergeFastReturnNullBug5793() throws Exception { + Connection conn = null; + try { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + + String tableName1 = generateUniqueName(); + String tableName2 = generateUniqueName(); + + String sql="CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+ + "AID INTEGER PRIMARY KEY,"+ + "AGE INTEGER"+ + ")"; + conn.createStatement().execute(sql); + + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (1,11)"); + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (2,22)"); + conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (3,33)"); + conn.commit(); + + sql="CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+ + "BID INTEGER PRIMARY KEY,"+ + "CODE INTEGER"+ + ")"; + conn.createStatement().execute(sql); + + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (1,66)"); + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (2,55)"); + conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (3,44)"); + conn.commit(); + + //test inner join, rhs is null + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a inner join "+ + "(select bid,code from "+tableName2+" where code > 70 limit 2) b on a.aid = b.bid"; + ResultSet rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + //test inner join,lhs is null + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a inner join "+ + "(select bid from "+tableName2+" where code > 50 limit 2) b on a.aid = b.bid"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + //test left join,lhs is null + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a left join "+ + "(select bid from "+tableName2+" where code > 50 limit 2) b on a.aid = b.bid"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + //test left join,rhs is null + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age >=11 and age<=33) a left join "+ + "(select bid from "+tableName2+" where code > 70 limit 2) b on a.aid = b.bid"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 1); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 2); + assertTrue(rs.next()); + assertTrue(rs.getInt(1) == 3); + assertTrue(!rs.next()); + + //test full join, lhs return null and rhs return null. + sql="select /*+ USE_SORT_MERGE_JOIN */ a.aid from (select aid,age from "+tableName1+" where age > 40) a left join "+ + "(select bid from "+tableName2+" where code > 70 limit 2) b on a.aid = b.bid"; + rs=conn.prepareStatement(sql).executeQuery(); + assertTrue(!rs.next()); + + } finally { + if(conn!=null) { + conn.close(); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 3211701..777ea47 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -28,6 +28,11 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -36,6 +41,7 @@ import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatelessExpressionCompiler; @@ -73,18 +79,25 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; public class SortMergeJoinPlan implements QueryPlan { + private static final Logger LOGGER = LoggerFactory.getLogger(SortMergeJoinPlan.class); private static final byte[] EMPTY_PTR = new byte[0]; private final StatementContext context; private final FilterableStatement statement; private final TableRef table; - private final JoinType type; + /** + * In {@link QueryCompiler#compileJoinQuery},{@link JoinType#Right} is converted + * to {@link JoinType#Left}. + */ + private final JoinType joinType; private final QueryPlan lhsPlan; private final QueryPlan rhsPlan; private final List<Expression> lhsKeyExpressions; @@ -122,7 +135,7 @@ public class SortMergeJoinPlan implements QueryPlan { this.context = context; this.statement = statement; this.table = table; - this.type = type; + this.joinType = type; this.lhsPlan = lhsPlan; this.rhsPlan = rhsPlan; this.lhsKeyExpressions = lhsAndRhsKeyExpressions.getFirst(); @@ -170,7 +183,7 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - return type == JoinType.Semi || type == JoinType.Anti ? + return joinType == JoinType.Semi || joinType == JoinType.Anti ? new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) : new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)); } @@ -183,7 +196,7 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public ExplainPlan getExplainPlan() throws SQLException { List<String> steps = Lists.newArrayList(); - steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES"); + steps.add("SORT-MERGE-JOIN (" + joinType.toString().toUpperCase() + ") TABLES"); for (String step : lhsPlan.getExplainPlan().getPlanSteps()) { steps.add(" " + step); } @@ -277,7 +290,7 @@ public class SortMergeJoinPlan implements QueryPlan { } public JoinType getJoinType() { - return type; + return joinType; } private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator rhsIterator) { @@ -300,7 +313,31 @@ public class SortMergeJoinPlan implements QueryPlan { return e; } - private class BasicJoinIterator implements ResultIterator { + /** + * close the futures and threadPoolExecutor,ignore exception. + * @param threadPoolExecutor + * @param futures + */ + private static void clearThreadPoolExecutor( + ExecutorService threadPoolExecutor, + List<Future<Boolean>> futures) { + for(Future<?> future : futures) { + try { + future.cancel(true); + } catch(Throwable ignore) { + LOGGER.error("cancel future error", ignore); + } + } + + try { + threadPoolExecutor.shutdownNow(); + } catch(Throwable ignore) { + LOGGER.error("shutdownNow threadPoolExecutor error", ignore); + } + } + + @VisibleForTesting + public class BasicJoinIterator implements ResultIterator { private final ResultIterator lhsIterator; private final ResultIterator rhsIterator; private boolean initialized; @@ -318,7 +355,8 @@ public class SortMergeJoinPlan implements QueryPlan { private byte[] emptyProjectedValue; private SizeAwareQueue<Tuple> queue; private Iterator<Tuple> queueIterator; - + private boolean joinResultNullBecauseOneSideNull = false; + public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { this.lhsIterator = lhsIterator; this.rhsIterator = rhsIterator; @@ -341,22 +379,30 @@ public class SortMergeJoinPlan implements QueryPlan { this.queue = PhoenixQueues.newTupleQueue(spoolingEnabled, thresholdBytes); this.queueIterator = null; } - + + public boolean isJoinResultNullBecauseOneSideNull() { + return this.joinResultNullBecauseOneSideNull; + } + + public boolean isInitialized() { + return this.initialized; + } + @Override public void close() throws SQLException { - SQLException e = closeIterators(lhsIterator, rhsIterator); + SQLException sqlException = closeIterators(lhsIterator, rhsIterator); try { queue.close(); } catch (IOException t) { - if (e != null) { - e.setNextException( + if (sqlException != null) { + sqlException.setNextException( new SQLException("Also encountered exception while closing queue", t)); } else { - e = new SQLException("Error while closing queue",t); + sqlException = new SQLException("Error while closing queue",t); } } - if (e != null) { - throw e; + if (sqlException != null) { + LOGGER.error("BasicJoinIterator close error!", sqlException); } } @@ -366,6 +412,10 @@ public class SortMergeJoinPlan implements QueryPlan { init(); } + if(this.joinResultNullBecauseOneSideNull) { + return null; + } + Tuple next = null; while (next == null && !isEnd()) { if (queueIterator != null) { @@ -404,12 +454,12 @@ public class SortMergeJoinPlan implements QueryPlan { } advance(false); } else if (lhsKey.compareTo(rhsKey) < 0) { - if (type == JoinType.Full || type == JoinType.Left) { + if (joinType == JoinType.Full || joinType == JoinType.Left) { next = join(lhsTuple, null); } advance(true); } else { - if (type == JoinType.Full) { + if (joinType == JoinType.Full) { next = join(null, rhsTuple); } advance(false); @@ -430,21 +480,72 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public void explain(List<String> planSteps) { } - - private void init() throws SQLException { - nextLhsTuple = lhsIterator.next(); - if (nextLhsTuple != null) { - nextLhsKey.evaluate(nextLhsTuple); + + private void doInit(boolean lhs) throws SQLException { + if(lhs) { + nextLhsTuple = lhsIterator.next(); + if (nextLhsTuple != null) { + nextLhsKey.evaluate(nextLhsTuple); + } + advance(true); + } else { + nextRhsTuple = rhsIterator.next(); + if (nextRhsTuple != null) { + nextRhsKey.evaluate(nextRhsTuple); + } + advance(false); } - advance(true); - nextRhsTuple = rhsIterator.next(); - if (nextRhsTuple != null) { - nextRhsKey.evaluate(nextRhsTuple); + } + + /** + * Parallel init, when: + * 1. {@link #lhsTuple} is null for inner join or left join. + * 2. {@link #rhsTuple} is null for inner join. + * we could conclude that the join result is null early, set {@link #joinResultNullBecauseOneSideNull} true. + * @throws SQLException + */ + private void init() throws SQLException { + ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); + ExecutorCompletionService<Boolean> executorCompletionService = + new ExecutorCompletionService<Boolean>(threadPoolExecutor); + List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(2); + futures.add(executorCompletionService.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + doInit(true); + return lhsTuple == null && + ((joinType == JoinType.Inner) || (joinType == JoinType.Left)); + } + })); + + futures.add(executorCompletionService.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + doInit(false); + return rhsTuple == null && joinType == JoinType.Inner; + } + })); + + try { + Future<Boolean> future = executorCompletionService.take(); + if(future.get()) { + this.joinResultNullBecauseOneSideNull = true; + this.initialized = true; + return; + } + + future = executorCompletionService.take(); + if(future.get()) { + this.joinResultNullBecauseOneSideNull = true; + } + initialized = true; + } catch (Throwable throwable) { + throw new SQLException("failed in init join iterators", throwable); + } finally { + clearThreadPoolExecutor(threadPoolExecutor, futures); } - advance(false); - initialized = true; } - + private void advance(boolean lhs) throws SQLException { if (lhs) { lhsTuple = nextLhsTuple; @@ -472,8 +573,8 @@ public class SortMergeJoinPlan implements QueryPlan { } private boolean isEnd() { - return (lhsTuple == null && (rhsTuple == null || type != JoinType.Full)) - || (queueIterator == null && rhsTuple == null && type == JoinType.Inner); + return (lhsTuple == null && (rhsTuple == null || joinType != JoinType.Full)) + || (queueIterator == null && rhsTuple == null && joinType == JoinType.Inner); } private Tuple join(Tuple lhs, Tuple rhs) throws SQLException { @@ -514,12 +615,15 @@ public class SortMergeJoinPlan implements QueryPlan { private Tuple rhsTuple; private JoinKey lhsKey; private JoinKey rhsKey; - + private boolean joinResultNullBecauseOneSideNull = false; + public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { - if (type != JoinType.Semi && type != JoinType.Anti) throw new IllegalArgumentException("Type " + type + " is not allowed by " + SemiAntiJoinIterator.class.getName()); + if (joinType != JoinType.Semi && joinType != JoinType.Anti) { + throw new IllegalArgumentException("Type " + joinType + " is not allowed by " + SemiAntiJoinIterator.class.getName()); + } this.lhsIterator = lhsIterator; this.rhsIterator = rhsIterator; - this.isSemi = type == JoinType.Semi; + this.isSemi = joinType == JoinType.Semi; this.initialized = false; this.lhsTuple = null; this.rhsTuple = null; @@ -527,22 +631,80 @@ public class SortMergeJoinPlan implements QueryPlan { this.rhsKey = new JoinKey(rhsKeyExpressions); } + public boolean isJoinResultNullBecauseOneSideNull() { + return this.joinResultNullBecauseOneSideNull; + } + + public boolean isInitialized() { + return this.initialized; + } + @Override public void close() throws SQLException { - SQLException e = closeIterators(lhsIterator, rhsIterator); - if (e != null) { - throw e; + SQLException sqlException = closeIterators(lhsIterator, rhsIterator); + if (sqlException != null) { + LOGGER.error("SemiAntiJoinIterator close error!", sqlException); + } + } + + /** + * Parallel init, when: + * 1. {@link #lhsTuple} is null. + * 2. {@link #rhsTuple} is null for left semi join. + * we could conclude that the join result is null early, set {@link #joinResultNullBecauseOneSideNull} true. + * @throws SQLException + */ + private void init() throws SQLException { + ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); + ExecutorCompletionService<Boolean> executorCompletionService = + new ExecutorCompletionService<Boolean>(threadPoolExecutor); + List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(2); + futures.add(executorCompletionService.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + advance(true); + return lhsTuple == null; + } + })); + + futures.add(executorCompletionService.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + advance(false); + return (rhsTuple == null && isSemi); + } + })); + + try { + Future<Boolean> future = executorCompletionService.take(); + if(future.get()) { + this.joinResultNullBecauseOneSideNull = true; + this.initialized = true; + return; + } + + future = executorCompletionService.take(); + if(future.get()) { + this.joinResultNullBecauseOneSideNull = true; + } + initialized = true; + } catch (Throwable throwable) { + throw new SQLException("failed in init join iterators", throwable); + } finally { + clearThreadPoolExecutor(threadPoolExecutor, futures); } } @Override public Tuple next() throws SQLException { if (!initialized) { - advance(true); - advance(false); - initialized = true; + init(); } - + + if(this.joinResultNullBecauseOneSideNull) { + return null; + } + Tuple next = null; while (next == null && !isEnd()) { if (rhsTuple != null) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java index 08a98e1..4a76bc5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/SortMergeJoinTest.java @@ -36,6 +36,9 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import static org.junit.Assert.assertTrue; @@ -164,4 +167,305 @@ public class SortMergeJoinTest { assertTrue(resultTuple == null); assertTrue(semiAntiJoinIterator.isEnd()); } + + private final long INIT_LATENCY = 10 * 1000L; + + @Test + public void testSortMergeFastReturnNullBug5793() throws SQLException, InterruptedException { + // mock for SortMergeJoinPlan + StatementContext statementContext = Mockito.mock(StatementContext.class); + PhoenixConnection phoenixConnection = Mockito.mock(PhoenixConnection.class); + when(statementContext.getConnection()).thenReturn(phoenixConnection); + ConnectionQueryServices connectionQueryServices = Mockito.mock(ConnectionQueryServices.class); + when(connectionQueryServices.getProps()).thenReturn(ReadOnlyProps.EMPTY_PROPS); + when(phoenixConnection.getQueryServices()).thenReturn(connectionQueryServices); + + List<Expression> expressions = new ArrayList<Expression>(); + Pair<List<Expression>,List<Expression>> lhsAndRhsJoinExpressions = Pair.newPair(expressions, expressions); + Pair<List<OrderByNode>, List<OrderByNode>> lhsAndRhsOrderByNodes = Pair.<List<OrderByNode>, List<OrderByNode>> newPair( + new ArrayList<OrderByNode>(), + new ArrayList<OrderByNode>()); + + //test inner join, lhs long latency and rhs return null. + JoinTableNode.JoinType joinType = JoinTableNode.JoinType.Inner; + ResultIterator lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenAnswer(longLatencyInit()); + QueryPlan lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + QueryPlan rhsQueryPlan = Mockito.mock(QueryPlan.class); + ResultIterator rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenReturn(null); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + SortMergeJoinPlan sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + SortMergeJoinPlan.BasicJoinIterator sortMergeJoinResultIterator = + (SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator(); + + long startTime = System.currentTimeMillis(); + Tuple resultTuple = sortMergeJoinResultIterator.next(); + long elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + + //test inner join, lhs return null and rhs long latency. + joinType = JoinTableNode.JoinType.Inner; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenAnswer(longLatencyInit()); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + sortMergeJoinResultIterator = + (SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + //test left join, lhs return null and rhs long latency. + joinType = JoinTableNode.JoinType.Left; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenAnswer(longLatencyInit()); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + sortMergeJoinResultIterator = (SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + //test full join, lhs return null and rhs return null. + joinType = JoinTableNode.JoinType.Full; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenReturn(null); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + sortMergeJoinResultIterator = (SortMergeJoinPlan.BasicJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(!sortMergeJoinResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + //test left semi join, lhs return null and rhs long latency. + joinType = JoinTableNode.JoinType.Semi; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenAnswer(longLatencyInit()); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + SortMergeJoinPlan.SemiAntiJoinIterator sortMergeJoinSemiAntiResultIterator = + (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinSemiAntiResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + //test left semi join, lhs long latency and rhs return null. + joinType = JoinTableNode.JoinType.Semi; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenAnswer(longLatencyInit()); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenReturn(null); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + sortMergeJoinSemiAntiResultIterator = (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinSemiAntiResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + + //test left semi join, lhs return null and rhs long latency. + joinType = JoinTableNode.JoinType.Anti; + lhsResultIterator = Mockito.mock(ResultIterator.class); + when(lhsResultIterator.next()).thenReturn(null); + lhsQueryPlan = Mockito.mock(QueryPlan.class); + when(lhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(lhsResultIterator); + + rhsQueryPlan = Mockito.mock(QueryPlan.class); + rhsResultIterator = Mockito.mock(ResultIterator.class); + when(rhsResultIterator.next()).thenAnswer(longLatencyInit()); + when(rhsQueryPlan.iterator(DefaultParallelScanGrouper.getInstance())).thenReturn(rhsResultIterator); + + sortMergeJoinPlan = new SortMergeJoinPlan( + statementContext, + null, + null, + joinType, + lhsQueryPlan, + rhsQueryPlan, + lhsAndRhsJoinExpressions, + expressions, + null, + null, + null, + 0, + true, + lhsAndRhsOrderByNodes); + sortMergeJoinSemiAntiResultIterator = + (SortMergeJoinPlan.SemiAntiJoinIterator)sortMergeJoinPlan.iterator(); + + startTime = System.currentTimeMillis(); + resultTuple = sortMergeJoinSemiAntiResultIterator.next(); + elapsed = System.currentTimeMillis() - startTime; + + assertTrue(resultTuple == null); + assertTrue(sortMergeJoinSemiAntiResultIterator.isJoinResultNullBecauseOneSideNull()); + assertTrue(sortMergeJoinSemiAntiResultIterator.isInitialized()); + assertTrue(elapsed < INIT_LATENCY); + } + + private Answer<Tuple> longLatencyInit() { + return new Answer<Tuple>() { + @Override + public Tuple answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(INIT_LATENCY); + Tuple tuple = Mockito.mock(Tuple.class); + return tuple; + } + }; + } }