Repository: asterixdb
Updated Branches:
  refs/heads/ecarm002/interval_join_merge [created] faa3bb457


fix for indexing memory error.


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/199bddd3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/199bddd3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/199bddd3

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 199bddd3c68ef8e8c3ee0cccf6d8cdcc38fd8189
Parents: f9443fa
Author: Preston Carman <prest...@apache.org>
Authored: Wed Jun 15 19:06:51 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Wed Jun 15 19:06:51 2016 -0700

----------------------------------------------------------------------
 .../joins/intervalindex/ActiveSweepManager.java | 10 +++----
 .../IntervalIndexJoinOperatorDescriptor.java    |  1 +
 .../intervalindex/IntervalIndexJoiner.java      | 29 ++++++--------------
 .../dataflow/std/join/AbstractMergeJoiner.java  | 12 ++++----
 .../dataflow/std/join/MergeBranchStatus.java    |  6 ++++
 .../std/join/MergeJoinOperatorDescriptor.java   |  1 +
 6 files changed, 29 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java
index 70785ab..84bd262 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/ActiveSweepManager.java
@@ -44,18 +44,18 @@ public class ActiveSweepManager {
     private EndPointIndexItem item = null;
     private final LinkedList<TuplePointer> active = new LinkedList<>();
 
-    public ActiveSweepManager(IPartitionedDeletableTupleBufferManager 
bufferManager, int key, int partition,
+    public ActiveSweepManager(IPartitionedDeletableTupleBufferManager 
bufferManager, int key, int joinBranch,
             Comparator<EndPointIndexItem> endPointComparator) {
         this.bufferManager = bufferManager;
         this.key = key;
-        this.partition = partition;
+        this.partition = joinBranch;
         indexQueue = new PriorityQueue<>(16, endPointComparator);
     }
 
-    public boolean addTuple(ITupleAccessor leftInputAccessor, TuplePointer tp) 
throws HyracksDataException {
-        if (bufferManager.insertTuple(partition, leftInputAccessor, 
leftInputAccessor.getTupleId(), tp)) {
+    public boolean addTuple(ITupleAccessor accessor, TuplePointer tp) throws 
HyracksDataException {
+        if (bufferManager.insertTuple(partition, accessor, 
accessor.getTupleId(), tp)) {
             EndPointIndexItem e = new EndPointIndexItem(tp, 
EndPointIndexItem.END_POINT,
-                    IntervalJoinUtil.getIntervalEnd(leftInputAccessor, 
leftInputAccessor.getTupleId(), key));
+                    IntervalJoinUtil.getIntervalEnd(accessor, 
accessor.getTupleId(), key));
             indexQueue.add(e);
             active.add(tp);
             item = indexQueue.peek();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
index c248f72..be44df3 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
@@ -258,6 +258,7 @@ public class IntervalIndexJoinOperatorDescriptor extends 
AbstractOperatorDescrip
                         locks.getRight(partition).await();
                     }
                     state.indexJoiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+                    state.status.continueRightLoad = false;
                     locks.getLeft(partition).signal();
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index cc164b7..e53f9ae 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -125,7 +125,6 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             LOGGER.fine("IntervalIndexJoiner has started partition " + 
partition + " with " + memorySize
                     + " frames of memory.");
         }
-
     }
 
     private void addToResult(IFrameTupleAccessor accessor1, int index1, 
IFrameTupleAccessor accessor2, int index2,
@@ -134,7 +133,6 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             FrameUtils.appendConcatToWriter(writer, resultAppender, accessor2, 
index2, accessor1, index1);
         } else {
             FrameUtils.appendConcatToWriter(writer, resultAppender, accessor1, 
index1, accessor2, index2);
-
         }
     }
 
@@ -200,7 +198,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
     public void processMergeUsingLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
         TupleStatus leftTs = loadLeftTuple();
         TupleStatus rightTs = loadRightTuple();
-        while (checkHasMoreProcessing(leftTs, LEFT_PARTITION) && 
checkHasMoreProcessing(rightTs, RIGHT_PARTITION)) {
+        while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, 
LEFT_PARTITION, RIGHT_PARTITION)
+                && checkHasMoreProcessing(rightTs, RIGHT_PARTITION, 
LEFT_PARTITION)) {
             if (status.branch[RIGHT_PARTITION].isRunFileWriting()) {
                 // Right side from disk
                 rightTs = processRightTupleSpill(writer);
@@ -221,9 +220,13 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         }
     }
 
-    private boolean checkHasMoreProcessing(TupleStatus ts, int partition) {
-        return ts.isLoaded() || (ts.isEmpty()
-                && (activeManager[partition].hasRecords() || 
status.branch[partition].isRunFileWriting()));
+    private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int 
joinPartition) {
+        return ts.isLoaded() || status.branch[partition].isRunFileWriting()
+                || (checkHasMoreTuples(joinPartition) && 
activeManager[partition].hasRecords());
+    }
+
+    private boolean checkHasMoreTuples(int partition) {
+        return status.branch[partition].hasMore() || 
status.branch[partition].isRunFileReading();
     }
 
     private boolean checkToProcessRightTuple() {
@@ -244,7 +247,6 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
     private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws 
HyracksDataException {
         // Process left tuples one by one, check them with active memory from 
the right branch.
-        //        System.err.println("  -  Start processLeftTupleSpill");
         int count = 0;
         TupleStatus ts = loadLeftTuple();
         while (ts.isLoaded() && activeManager[RIGHT_PARTITION].hasRecords()) {
@@ -273,12 +275,10 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION], 
RIGHT_PARTITION);
             ts = loadLeftTuple();
         }
-        //        System.err.println("  -  End processLeftTupleSpill");
         return ts;
     }
 
     private TupleStatus processRightTupleSpill(IFrameWriter writer) throws 
HyracksDataException {
-        //        System.err.println("  -  Start processRightTupleSpill");
         // Process left tuples one by one, check them with active memory from 
the right branch.
         int count = 0;
         TupleStatus ts = loadRightTuple();
@@ -308,12 +308,10 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             unfreezeAndContinue(RIGHT_PARTITION, 
inputAccessor[RIGHT_PARTITION], LEFT_PARTITION);
             ts = loadRightTuple();
         }
-        //        System.err.println("  -  End processRightTupleSpill");
         return ts;
     }
 
     private void processLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
-        //        System.err.println(" +++ Start processLeftTuple");
         // Process endpoints
         do {
             if ((!activeManager[LEFT_PARTITION].hasRecords()
@@ -341,11 +339,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
                     memoryAccessor[LEFT_PARTITION], true, writer);
         }
-        //        System.err.println(" +++ End processLeftTuple");
     }
 
     private void processRightTuple(IFrameWriter writer) throws 
HyracksDataException {
-        //        System.err.println(" +++ Start processRightTuple");
         // Process endpoints
         do {
             if ((!activeManager[RIGHT_PARTITION].hasRecords()
@@ -373,7 +369,6 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
                     memoryAccessor[RIGHT_PARTITION], false, writer);
         }
-        //        System.err.println(" +++ End processRightTuple");
     }
 
     private void processActiveJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
@@ -383,11 +378,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             outerAccessor.reset(outerTp);
             for (TuplePointer innerTp : inner) {
                 innerAccessor.reset(innerTp);
-                //                TuplePrinterUtil.printTuple("     --- A 
outer", outerAccessor, outerTp.tupleIndex);
-                //                TuplePrinterUtil.printTuple("     --- A 
inner", innerAccessor, innerTp.tupleIndex);
                 if (imjc.checkToSaveInResult(outerAccessor, 
outerTp.tupleIndex, innerAccessor, innerTp.tupleIndex,
                         reversed)) {
-                    //                    System.err.println("  -- Matched 
--");
                     addToResult(outerAccessor, outerTp.tupleIndex, 
innerAccessor, innerTp.tupleIndex, reversed, writer);
                 }
             }
@@ -402,11 +394,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter 
writer) throws HyracksDataException {
         for (TuplePointer outerTp : outer) {
             outerAccessor.reset(outerTp);
-            //            TuplePrinterUtil.printTuple("     --- outer", 
outerAccessor, outerTp.tupleIndex);
-            //            TuplePrinterUtil.printTuple("     --- inner", 
tupleAccessor);
             if (imjc.checkToSaveInResult(outerAccessor, outerTp.tupleIndex, 
tupleAccessor, tupleAccessor.getTupleId(),
                     reversed)) {
-                //                System.err.println("  -- Matched --");
                 addToResult(outerAccessor, outerTp.tupleIndex, tupleAccessor, 
tupleAccessor.getTupleId(), reversed,
                         writer);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index e9ed084..f8d328b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.std.buffermanager.ITupleAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.TupleAccessor;
-import org.apache.hyracks.dataflow.std.join.AbstractMergeJoiner.TupleStatus;
 import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
 
 public abstract class AbstractMergeJoiner implements IMergeJoiner {
@@ -44,6 +43,10 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
         public boolean isEmpty() {
             return this.equals(EMPTY);
         }
+
+        public boolean isKnown() {
+            return !this.equals(UNKNOWN);
+        }
     }
 
     protected static final int JOIN_PARTITIONS = 2;
@@ -97,12 +100,12 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
         return TupleStatus.LOADED;
     }
 
-    protected TupleStatus loadMemoryTuple(int partition) {
+    protected TupleStatus loadMemoryTuple(int joinId) {
         TupleStatus loaded;
-        if (inputAccessor[partition] != null && 
inputAccessor[partition].exists()) {
+        if (inputAccessor[joinId] != null && inputAccessor[joinId].exists()) {
             // Still processing frame.
             loaded = TupleStatus.LOADED;
-        } else if (status.branch[partition].hasMore()) {
+        } else if (status.branch[joinId].hasMore()) {
             loaded = TupleStatus.UNKNOWN;
         } else {
             // No more frames or tuples to process.
@@ -120,7 +123,6 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
         inputBuffer[partition].put(buffer.array(), 0, buffer.capacity());
         inputAccessor[partition].reset(inputBuffer[partition]);
         inputAccessor[partition].next();
-        status.continueRightLoad = false;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java
index f7f9737..4f69189 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeBranchStatus.java
@@ -92,4 +92,10 @@ public class MergeBranchStatus implements 
IRunFileStreamStatus, Serializable {
         this.runFileReading = runFileReading;
     }
 
+    @Override
+    public String toString() {
+        return "Branch status is " + stage + ": the stream " + (hasMore ? "has 
more" : "is empty")
+                + " and the run file is " + (runFileWriting ? "WRITING " : "") 
+ (runFileReading ? "READING " : "");
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/199bddd3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 24abddf..649247e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -260,6 +260,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
                         locks.getRight(partition).await();
                     }
                     state.joiner.setFrame(RIGHT_ACTIVITY_ID, buffer);
+                    state.status.continueRightLoad = false;
                     locks.getLeft(partition).signal();
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();

Reply via email to