This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit e48e9f2dc0c6f54082d463d47318fcade6b632c8 Author: Dmitry Lychagin <[email protected]> AuthorDate: Tue Mar 30 17:12:27 2021 -0700 [ASTERIXDB-2857][RT] Incorrect result for nested loop outer join - user model changes: no - storage format changes: no - interface changes: no Details: - Fix incorrect number of unmatched tuples emitted by nested loop implementation of left outer join - Add RunFileWriter.eraseClosed() method Change-Id: Ib56cf82fbe335a1d8f5d69caaf51e746db252202 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10785 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../leftouterjoin/query-ASTERIXDB-2857.sqlpp | 63 ++++++++ .../leftouterjoin/query-ASTERIXDB-2857.plan | 39 +++++ .../query-ASTERIXDB-2857.1.ddl.sqlpp | 48 ++++++ .../query-ASTERIXDB-2857.2.update.sqlpp | 22 +++ .../query-ASTERIXDB-2857.3.query.sqlpp | 39 +++++ .../query-ASTERIXDB-2857.3.adm | 2 + .../test/resources/runtimets/testsuite_sqlpp.xml | 5 + .../hyracks/dataflow/common/io/RunFileWriter.java | 10 +- .../hyracks/dataflow/std/join/NestedLoopJoin.java | 165 ++++++++++++--------- .../OptimizedHybridHashJoinOperatorDescriptor.java | 5 +- 10 files changed, 326 insertions(+), 72 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp new file mode 100644 index 0000000..2304349 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Test nested loop implementation of left outer join + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type tenkType as closed { + unique1 : integer, + unique2 : integer, + two : integer, + four : integer, + ten : integer, + twenty : integer, + hundred : integer, + thousand : integer, + twothousand : integer, + fivethous : integer, + tenthous : integer, + odd100 : integer, + even100 : integer, + stringu1 : string, + stringu2 : string, + string4 : string +}; + +create dataset tenk(tenkType) primary key unique2; + +SELECT + t0.unique1 AS t0_unique1, + t1.unique1 AS t1_unique1, + t2.unique1 AS t2_unique1 +FROM ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 2 +) t0 +INNER JOIN ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 4 +) t1 ON t0.unique2 = t1.unique2 +LEFT JOIN ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 6 +) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2 +ORDER BY t0_unique1, t1_unique1, t2_unique1; diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan new file mode 100644 index 0000000..393f1db --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan @@ -0,0 +1,39 @@ +-- DISTRIBUTE_RESULT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- SORT_MERGE_EXCHANGE [$$136(ASC), $$137(ASC), $#3(ASC) ] |PARTITIONED| + -- STABLE_SORT [$$136(ASC), $$137(ASC), $#3(ASC)] |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- NESTED_LOOP |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- HYBRID_HASH_JOIN [$$127][$$128] |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- BROADCAST_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp new file mode 100644 index 0000000..87b5d75 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Test nested loop implementation of left outer join + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type tenkType as closed { + unique1 : integer, + unique2 : integer, + two : integer, + four : integer, + ten : integer, + twenty : integer, + hundred : integer, + thousand : integer, + twothousand : integer, + fivethous : integer, + tenthous : integer, + odd100 : integer, + even100 : integer, + stringu1 : string, + stringu2 : string, + string4 : string +}; + +create dataset tenk(tenkType) primary key unique2; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp new file mode 100644 index 0000000..2d7e768 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +load dataset tenk using localfs ((`path`=`asterix_nc1://data/tenk.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp new file mode 100644 index 0000000..823a540 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Test nested loop implementation of left outer join + */ + +USE test; + +SELECT + t0.unique1 AS t0_unique1, + t1.unique1 AS t1_unique1, + t2.unique1 AS t2_unique1 +FROM ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 2 +) t0 +INNER JOIN ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 4 +) t1 ON t0.unique2 = t1.unique2 +LEFT JOIN ( + SELECT unique1, unique2 FROM tenk WHERE unique2 < 6 +) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2 +ORDER BY t0_unique1, t1_unique1, t2_unique1; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm new file mode 100644 index 0000000..1a31db8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm @@ -0,0 +1,2 @@ +{ "t0_unique1": 1891, "t1_unique1": 1891, "t2_unique1": 1891 } +{ "t0_unique1": 8800, "t1_unique1": 8800, "t2_unique1": 8800 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 9dc621d..ce440c1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -12983,6 +12983,11 @@ </compilation-unit> </test-case> <test-case FilePath="leftouterjoin"> + <compilation-unit name="query-ASTERIXDB-2857"> + <output-dir compare="Text">query-ASTERIXDB-2857</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="leftouterjoin"> <compilation-unit name="right_branch_opt_1"> <output-dir compare="Text">right_branch_opt_1</output-dir> </compilation-unit> diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java index dabdd4f..c370b58 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java @@ -75,9 +75,15 @@ public class RunFileWriter implements IFrameWriter { } public void erase() throws HyracksDataException { - close(); - file.delete(); + try { + close(); + } finally { + eraseClosed(); + } + } + public void eraseClosed() { + file.delete(); // Make sure we never access the file if it is deleted. file = null; handle = null; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java index a5ad500..03ff72f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java @@ -20,6 +20,7 @@ package org.apache.hyracks.dataflow.std.join; import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.BitSet; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; @@ -28,6 +29,7 @@ import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.dataflow.value.IMissingWriter; import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator; import org.apache.hyracks.api.dataflow.value.ITuplePairComparator; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -43,6 +45,14 @@ import org.apache.hyracks.dataflow.std.buffermanager.VariableFrameMemoryManager; import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; public class NestedLoopJoin { + // Note: Min memory budget should be less than {@code AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN} + // Inner join: 1 frame for the outer input side, 1 frame for the inner input side, 1 frame for the output + private static final int MIN_FRAME_BUDGET_INNER_JOIN = 3; + // Outer join extra: Add 1 frame for the {@code outerMatchLOJ} bitset + private static final int MIN_FRAME_BUDGET_OUTER_JOIN = MIN_FRAME_BUDGET_INNER_JOIN + 1; + // Outer join needs 1 bit per each tuple in the outer side buffer + private static final int ESTIMATE_AVG_TUPLE_SIZE = 128; + private final FrameTupleAccessor accessorInner; private final FrameTupleAccessor accessorOuter; private final FrameTupleAppender appender; @@ -54,30 +64,45 @@ public class NestedLoopJoin { private final boolean isLeftOuter; private final ArrayTupleBuilder missingTupleBuilder; private final IPredicateEvaluator predEvaluator; - private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal + // Added for handling correct calling for predicate-evaluator upon recursive calls + // (in OptimizedHybridHashJoin) that cause role-reversal + private final boolean isReversed; private final BufferInfo tempInfo = new BufferInfo(null, -1, -1); + private final BitSet outerMatchLOJ; public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter, - FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, + FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter, IMissingWriter[] missingWriters) throws HyracksDataException { + this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, predEval, isLeftOuter, missingWriters, + false); + } + + public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter, + FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter, + IMissingWriter[] missingWriters, boolean isReversed) throws HyracksDataException { this.accessorInner = accessorInner; this.accessorOuter = accessorOuter; this.appender = new FrameTupleAppender(); this.outBuffer = new VSizeFrame(jobletContext); this.innerBuffer = new VSizeFrame(jobletContext); this.appender.reset(outBuffer, true); - if (memSize < 3) { - throw new HyracksDataException("Not enough memory is available for Nested Loop Join"); + + int minMemBudgetInFrames = isLeftOuter ? MIN_FRAME_BUDGET_OUTER_JOIN : MIN_FRAME_BUDGET_INNER_JOIN; + if (memBudgetInFrames < minMemBudgetInFrames) { + throw new HyracksDataException(ErrorCode.INSUFFICIENT_MEMORY); } + int outerBufferMngrMemBudgetInFrames = memBudgetInFrames - minMemBudgetInFrames + 1; + int outerBufferMngrMemBudgetInBytes = jobletContext.getInitialFrameSize() * outerBufferMngrMemBudgetInFrames; this.outerBufferMngr = new VariableFrameMemoryManager( - new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)), - FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2)); + new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory + .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames)); this.predEvaluator = predEval; - this.isReversed = false; - this.isLeftOuter = isLeftOuter; if (isLeftOuter) { + if (isReversed) { + throw new HyracksDataException(ErrorCode.ILLEGAL_STATE, "Outer join cannot reverse roles"); + } int innerFieldCount = this.accessorInner.getFieldCount(); missingTupleBuilder = new ArrayTupleBuilder(innerFieldCount); DataOutput out = missingTupleBuilder.getDataOutput(); @@ -85,9 +110,14 @@ public class NestedLoopJoin { missingWriters[i].writeMissing(out); missingTupleBuilder.addFieldEndOffset(); } + // Outer join needs 1 bit per each tuple in the outer side buffer + int outerMatchLOJCardinalityEstimate = outerBufferMngrMemBudgetInBytes / ESTIMATE_AVG_TUPLE_SIZE; + outerMatchLOJ = new BitSet(Math.max(outerMatchLOJCardinalityEstimate, 1)); } else { missingTupleBuilder = null; + outerMatchLOJ = null; } + this.isReversed = isReversed; FileReference file = jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString()); @@ -117,23 +147,7 @@ public class NestedLoopJoin { return; } if (outerBufferMngr.insertFrame(outerBuffer) < 0) { - RunFileReader runFileReader = runFileWriter.createReader(); - try { - runFileReader.open(); - if (runFileReader.nextFrame(innerBuffer)) { - do { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); - } - } while (runFileReader.nextFrame(innerBuffer)); - } else if (isLeftOuter) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer); - } - } - } finally { - runFileReader.close(); - } + multiBlockJoin(writer); outerBufferMngr.reset(); if (outerBufferMngr.insertFrame(outerBuffer) < 0) { throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity() @@ -142,16 +156,51 @@ public class NestedLoopJoin { } } - private void blockJoin(BufferInfo outerBufferInfo, ByteBuffer innerBuffer, IFrameWriter writer) - throws HyracksDataException { - accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength()); - accessorInner.reset(innerBuffer); - int tupleCount0 = accessorOuter.getTupleCount(); - int tupleCount1 = accessorInner.getTupleCount(); + private void multiBlockJoin(IFrameWriter writer) throws HyracksDataException { + int outerBufferFrameCount = outerBufferMngr.getNumFrames(); + if (outerBufferFrameCount == 0) { + return; + } + RunFileReader runFileReader = runFileWriter.createReader(); + try { + runFileReader.open(); + if (isLeftOuter) { + outerMatchLOJ.clear(); + } + while (runFileReader.nextFrame(innerBuffer)) { + int outerTupleRunningCount = 0; + for (int i = 0; i < outerBufferFrameCount; i++) { + BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo); + accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), + outerBufferInfo.getLength()); + int outerTupleCount = accessorOuter.getTupleCount(); + accessorInner.reset(innerBuffer.getBuffer()); + blockJoin(outerTupleRunningCount, writer); + outerTupleRunningCount += outerTupleCount; + } + } + if (isLeftOuter) { + int outerTupleRunningCount = 0; + for (int i = 0; i < outerBufferFrameCount; i++) { + BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo); + accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), + outerBufferInfo.getLength()); + int outerFrameTupleCount = accessorOuter.getTupleCount(); + appendMissing(outerTupleRunningCount, outerFrameTupleCount, writer); + outerTupleRunningCount += outerFrameTupleCount; + } + } + } finally { + runFileReader.close(); + } + } - for (int i = 0; i < tupleCount0; ++i) { + private void blockJoin(int outerTupleStartPos, IFrameWriter writer) throws HyracksDataException { + int outerTupleCount = accessorOuter.getTupleCount(); + int innerTupleCount = accessorInner.getTupleCount(); + for (int i = 0; i < outerTupleCount; ++i) { boolean matchFound = false; - for (int j = 0; j < tupleCount1; ++j) { + for (int j = 0; j < innerTupleCount; ++j) { int c = tpComparator.compare(accessorOuter, i, accessorInner, j); boolean prdEval = evaluatePredicate(i, j); if (c == 0 && prdEval) { @@ -159,13 +208,8 @@ public class NestedLoopJoin { appendToResults(i, j, writer); } } - - if (!matchFound && isLeftOuter) { - final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets(); - final byte[] ntByteArray = missingTupleBuilder.getByteArray(); - final int ntSize = missingTupleBuilder.getSize(); - FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, - ntSize); + if (isLeftOuter && matchFound) { + outerMatchLOJ.set(outerTupleStartPos + i); } } } @@ -191,15 +235,18 @@ public class NestedLoopJoin { FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2); } - private void appendMissing(BufferInfo outerBufferInfo, IFrameWriter writer) throws HyracksDataException { - accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength()); - int tupleCount = accessorOuter.getTupleCount(); - for (int i = 0; i < tupleCount; ++i) { - final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets(); - final byte[] ntByteArray = missingTupleBuilder.getByteArray(); - final int ntSize = missingTupleBuilder.getSize(); - FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0, - ntSize); + private void appendMissing(int outerFrameMngrStartPos, int outerFrameTupleCount, IFrameWriter writer) + throws HyracksDataException { + int limit = outerFrameMngrStartPos + outerFrameTupleCount; + for (int outerTuplePos = + outerMatchLOJ.nextClearBit(outerFrameMngrStartPos); outerTuplePos < limit; outerTuplePos = + outerMatchLOJ.nextClearBit(outerTuplePos + 1)) { + int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets(); + byte[] ntByteArray = missingTupleBuilder.getByteArray(); + int ntSize = missingTupleBuilder.getSize(); + int outerAccessorTupleIndex = outerTuplePos - outerFrameMngrStartPos; + FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, outerAccessorTupleIndex, ntFieldEndOffsets, + ntByteArray, 0, ntSize); } } @@ -210,22 +257,10 @@ public class NestedLoopJoin { } public void completeJoin(IFrameWriter writer) throws HyracksDataException { - RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader(); try { - runFileReader.open(); - if (runFileReader.nextFrame(innerBuffer)) { - do { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer); - } - } while (runFileReader.nextFrame(innerBuffer)); - } else if (isLeftOuter) { - for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) { - appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer); - } - } + multiBlockJoin(writer); } finally { - runFileReader.close(); + runFileWriter.eraseClosed(); } appender.write(writer, true); } @@ -233,8 +268,4 @@ public class NestedLoopJoin { public void releaseMemory() throws HyracksDataException { outerBufferMngr.reset(); } - - public void setIsReversed(boolean b) { - this.isReversed = b; - } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index c142113..bb79981 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -802,11 +802,10 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD // The nested loop join result is outer + inner. All the other operator is probe + build. // Hence the reverse relation is different. boolean isReversed = outerRd == buildRd && innerRd == probeRd; - assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles"; ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp; NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd), - new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter); - nlj.setIsReversed(isReversed); + new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter, + isReversed); nlj.setComparator(nljComptorOuterInner); IFrame cacheBuff = new VSizeFrame(jobletCtx);
