Repository: asterixdb Updated Branches: refs/heads/ecarm002/interval_join_merge [created] faa3bb457
fix for indexing memory error. Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/199bddd3 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/199bddd3 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/199bddd3 Branch: refs/heads/ecarm002/interval_join_merge Commit: 199bddd3c68ef8e8c3ee0cccf6d8cdcc38fd8189 Parents: f9443fa Author: Preston Carman <prest...@apache.org> Authored: Wed Jun 15 19:06:51 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Wed Jun 15 19:06:51 2016 -0700 ---------------------------------------------------------------------- .../joins/intervalindex/ActiveSweepManager.java | 10 +++---- .../IntervalIndexJoinOperatorDescriptor.java | 1 + .../intervalindex/IntervalIndexJoiner.java | 29 ++++++-------------- .../dataflow/std/join/AbstractMergeJoiner.java | 12 ++++---- .../dataflow/std/join/MergeBranchStatus.java | 6 ++++ .../std/join/MergeJoinOperatorDescriptor.java | 1 + 6 files changed, 29 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java index 70785ab..84bd262 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java @@ -44,18 +44,18 @@ public class ActiveSweepManager { private EndPointIndexItem item = null; private final LinkedList<TuplePointer> active = new LinkedList<>(); - public ActiveSweepManager(IPartitionedDeletableTupleBufferManager bufferManager, int key, int partition, + public ActiveSweepManager(IPartitionedDeletableTupleBufferManager bufferManager, int key, int joinBranch, Comparator<EndPointIndexItem> endPointComparator) { this.bufferManager = bufferManager; this.key = key; - this.partition = partition; + this.partition = joinBranch; indexQueue = new PriorityQueue<>(16, endPointComparator); } - public boolean addTuple(ITupleAccessor leftInputAccessor, TuplePointer tp) throws HyracksDataException { - if (bufferManager.insertTuple(partition, leftInputAccessor, leftInputAccessor.getTupleId(), tp)) { + public boolean addTuple(ITupleAccessor accessor, TuplePointer tp) throws HyracksDataException { + if (bufferManager.insertTuple(partition, accessor, accessor.getTupleId(), tp)) { EndPointIndexItem e = new EndPointIndexItem(tp, EndPointIndexItem.END_POINT, - IntervalJoinUtil.getIntervalEnd(leftInputAccessor, leftInputAccessor.getTupleId(), key)); + IntervalJoinUtil.getIntervalEnd(accessor, accessor.getTupleId(), key)); indexQueue.add(e); active.add(tp); item = indexQueue.peek(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java index c248f72..be44df3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java @@ -258,6 +258,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip locks.getRight(partition).await(); } state.indexJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer); + state.status.continueRightLoad = false; locks.getLeft(partition).signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index cc164b7..e53f9ae 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -125,7 +125,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { LOGGER.fine("IntervalIndexJoiner has started partition " + partition + " with " + memorySize + " frames of memory."); } - } private void addToResult(IFrameTupleAccessor accessor1, int index1, IFrameTupleAccessor accessor2, int index2, @@ -134,7 +133,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { FrameUtils.appendConcatToWriter(writer, resultAppender, accessor2, index2, accessor1, index1); } else { FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, index1, accessor2, index2); - } } @@ -200,7 +198,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException { TupleStatus leftTs = loadLeftTuple(); TupleStatus rightTs = loadRightTuple(); - while (checkHasMoreProcessing(leftTs, LEFT_PARTITION) && checkHasMoreProcessing(rightTs, RIGHT_PARTITION)) { + while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, LEFT_PARTITION, RIGHT_PARTITION) + && checkHasMoreProcessing(rightTs, RIGHT_PARTITION, LEFT_PARTITION)) { if (status.branch[RIGHT_PARTITION].isRunFileWriting()) { // Right side from disk rightTs = processRightTupleSpill(writer); @@ -221,9 +220,13 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } } - private boolean checkHasMoreProcessing(TupleStatus ts, int partition) { - return ts.isLoaded() || (ts.isEmpty() - && (activeManager[partition].hasRecords() || status.branch[partition].isRunFileWriting())); + private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int joinPartition) { + return ts.isLoaded() || status.branch[partition].isRunFileWriting() + || (checkHasMoreTuples(joinPartition) && activeManager[partition].hasRecords()); + } + + private boolean checkHasMoreTuples(int partition) { + return status.branch[partition].hasMore() || status.branch[partition].isRunFileReading(); } private boolean checkToProcessRightTuple() { @@ -244,7 +247,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { // Process left tuples one by one, check them with active memory from the right branch. - // System.err.println(" - Start processLeftTupleSpill"); int count = 0; TupleStatus ts = loadLeftTuple(); while (ts.isLoaded() && activeManager[RIGHT_PARTITION].hasRecords()) { @@ -273,12 +275,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION], RIGHT_PARTITION); ts = loadLeftTuple(); } - // System.err.println(" - End processLeftTupleSpill"); return ts; } private TupleStatus processRightTupleSpill(IFrameWriter writer) throws HyracksDataException { - // System.err.println(" - Start processRightTupleSpill"); // Process left tuples one by one, check them with active memory from the right branch. int count = 0; TupleStatus ts = loadRightTuple(); @@ -308,12 +308,10 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { unfreezeAndContinue(RIGHT_PARTITION, inputAccessor[RIGHT_PARTITION], LEFT_PARTITION); ts = loadRightTuple(); } - // System.err.println(" - End processRightTupleSpill"); return ts; } private void processLeftTuple(IFrameWriter writer) throws HyracksDataException { - // System.err.println(" +++ Start processLeftTuple"); // Process endpoints do { if ((!activeManager[LEFT_PARTITION].hasRecords() @@ -341,11 +339,9 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION], true, writer); } - // System.err.println(" +++ End processLeftTuple"); } private void processRightTuple(IFrameWriter writer) throws HyracksDataException { - // System.err.println(" +++ Start processRightTuple"); // Process endpoints do { if ((!activeManager[RIGHT_PARTITION].hasRecords() @@ -373,7 +369,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION], false, writer); } - // System.err.println(" +++ End processRightTuple"); } private void processActiveJoin(List<TuplePointer> outer, ITuplePointerAccessor outerAccessor, @@ -383,11 +378,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { outerAccessor.reset(outerTp); for (TuplePointer innerTp : inner) { innerAccessor.reset(innerTp); - // TuplePrinterUtil.printTuple(" --- A outer", outerAccessor, outerTp.tupleIndex); - // TuplePrinterUtil.printTuple(" --- A inner", innerAccessor, innerTp.tupleIndex); if (imjc.checkToSaveInResult(outerAccessor, outerTp.tupleIndex, innerAccessor, innerTp.tupleIndex, reversed)) { - // System.err.println(" -- Matched --"); addToResult(outerAccessor, outerTp.tupleIndex, innerAccessor, innerTp.tupleIndex, reversed, writer); } } @@ -402,11 +394,8 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter writer) throws HyracksDataException { for (TuplePointer outerTp : outer) { outerAccessor.reset(outerTp); - // TuplePrinterUtil.printTuple(" --- outer", outerAccessor, outerTp.tupleIndex); - // TuplePrinterUtil.printTuple(" --- inner", tupleAccessor); if (imjc.checkToSaveInResult(outerAccessor, outerTp.tupleIndex, tupleAccessor, tupleAccessor.getTupleId(), reversed)) { - // System.err.println(" -- Matched --"); addToResult(outerAccessor, outerTp.tupleIndex, tupleAccessor, tupleAccessor.getTupleId(), reversed, writer); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java index e9ed084..f8d328b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java @@ -27,7 +27,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor; import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor; -import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner.TupleStatus; import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage; public abstract class AbstractMergeJoiner implements IMergeJoiner { @@ -44,6 +43,10 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { public boolean isEmpty() { return this.equals(EMPTY); } + + public boolean isKnown() { + return !this.equals(UNKNOWN); + } } protected static final int JOIN_PARTITIONS = 2; @@ -97,12 +100,12 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { return TupleStatus.LOADED; } - protected TupleStatus loadMemoryTuple(int partition) { + protected TupleStatus loadMemoryTuple(int joinId) { TupleStatus loaded; - if (inputAccessor[partition] != null && inputAccessor[partition].exists()) { + if (inputAccessor[joinId] != null && inputAccessor[joinId].exists()) { // Still processing frame. loaded = TupleStatus.LOADED; - } else if (status.branch[partition].hasMore()) { + } else if (status.branch[joinId].hasMore()) { loaded = TupleStatus.UNKNOWN; } else { // No more frames or tuples to process. @@ -120,7 +123,6 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { inputBuffer[partition].put(buffer.array(), 0, buffer.capacity()); inputAccessor[partition].reset(inputBuffer[partition]); inputAccessor[partition].next(); - status.continueRightLoad = false; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java index f7f9737..4f69189 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java @@ -92,4 +92,10 @@ public class MergeBranchStatus implements IRunFileStreamStatus, Serializable { this.runFileReading = runFileReading; } + @Override + public String toString() { + return "Branch status is " + stage + ": the stream " + (hasMore ? "has more" : "is empty") + + " and the run file is " + (runFileWriting ? "WRITING " : "") + (runFileReading ? "READING " : ""); + + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java index 24abddf..649247e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java @@ -260,6 +260,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { locks.getRight(partition).await(); } state.joiner.setFrame(RIGHT_ACTIVITY_ID, buffer); + state.status.continueRightLoad = false; locks.getLeft(partition).signal(); } catch (InterruptedException e) { Thread.currentThread().interrupt();