snapshot with logging

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1df9a9c7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1df9a9c7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1df9a9c7

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 1df9a9c7db4b686e75b870ee02080f8fe15ca666
Parents: 1fae6ac
Author: Preston Carman <prest...@apache.org>
Authored: Tue Sep 6 15:51:12 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Tue Sep 6 15:51:12 2016 -0700

----------------------------------------------------------------------
 .../joins/AbstractIntervalMergeJoinChecker.java |  6 +-
 .../IntervalIndexJoinOperatorDescriptor.java    |  6 +-
 .../intervalindex/IntervalIndexJoiner.java      | 86 +++++++++++++-------
 .../dataflow/std/join/AbstractMergeJoiner.java  | 24 +++---
 .../hyracks/dataflow/std/join/IMergeJoiner.java |  2 +
 .../std/join/MergeJoinOperatorDescriptor.java   | 18 +++-
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 80 +++++++++++++-----
 .../dataflow/std/join/NestedLoopJoin.java       |  8 +-
 .../dataflow/std/join/RunFileStream.java        |  9 +-
 9 files changed, 162 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
index b461799..ec8ecda 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AbstractIntervalMergeJoinChecker.java
@@ -69,7 +69,7 @@ public abstract class AbstractIntervalMergeJoinChecker 
implements IIntervalMerge
         IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, 
ipRight);
         ipLeft.getEnd(endLeft);
         ipRight.getStart(startRight);
-        return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, 
startRight) >= 0;
+        return ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), endLeft, 
startRight) > 0;
     }
 
     @Override
@@ -79,7 +79,7 @@ public abstract class AbstractIntervalMergeJoinChecker 
implements IIntervalMerge
         IntervalJoinUtil.getIntervalPointable(accessorRight, idRight, tvp, 
ipRight);
         ipLeft.getStart(startLeft);
         ipRight.getEnd(endRight);
-        return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), 
startLeft, endRight) <= 0);
+        return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), 
startLeft, endRight) < 0);
     }
 
     @Override
@@ -102,7 +102,7 @@ public abstract class AbstractIntervalMergeJoinChecker 
implements IIntervalMerge
         IntervalJoinUtil.getIntervalPointable(accessorRight, rightTupleIndex, 
idRight, tvp, ipRight);
         ipLeft.getStart(startLeft);
         ipRight.getEnd(endRight);
-        return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), 
startLeft, endRight) <= 0);
+        return !(ch.compare(ipLeft.getTypeTag(), ipRight.getTypeTag(), 
startLeft, endRight) < 0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
index be44df3..d84fabc 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java
@@ -37,6 +37,7 @@ import 
org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
 import org.apache.hyracks.dataflow.std.join.MergeJoinLocks;
 
 public class IntervalIndexJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
@@ -191,7 +192,7 @@ public class IntervalIndexJoinOperatorDescriptor extends 
AbstractOperatorDescrip
         private static final long serialVersionUID = 1L;
 
         private final ActivityId joinAid;
-        private MergeJoinLocks locks;
+        private final MergeJoinLocks locks;
 
         public RightDataActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
             super(id);
@@ -253,7 +254,8 @@ public class IntervalIndexJoinOperatorDescriptor extends 
AbstractOperatorDescrip
                     first = false;
                 }
                 try {
-                    while (!state.status.continueRightLoad && 
state.status.branch[LEFT_ACTIVITY_ID].hasMore()) {
+                    while (!state.status.continueRightLoad
+                            && 
state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
                         // Wait for the state to request right frame unless 
left has finished.
                         locks.getRight(partition).await();
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index e4c4cbe..965411b 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -74,8 +74,11 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner 
{
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
     private long spillCount = 0;
-    private long spillReadCount = 0;
-    private long spillWriteCount = 0;
+    private long leftSpillCount = 0;
+    private long rightSpillCount = 0;
+    private long[] spillFileCount = { 0, 0 };
+    private long[] spillReadCount = { 0, 0 };
+    private long[] spillWriteCount = { 0, 0 };
 
     public IntervalIndexJoiner(IHyracksTaskContext ctx, int memorySize, int 
partition, MergeStatus status,
             MergeJoinLocks locks, Comparator<EndPointIndexItem> 
endPointComparator,
@@ -109,10 +112,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         memoryAccessor[RIGHT_PARTITION] = 
bufferManager.getTuplePointerAccessor(rightRd);
 
         activeManager = new ActiveSweepManager[JOIN_PARTITIONS];
-        activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, 
leftKey, LEFT_PARTITION,
-                endPointComparator);
-        activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, 
rightKey, RIGHT_PARTITION,
-                endPointComparator);
+        activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, 
leftKey, LEFT_PARTITION, endPointComparator);
+        activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, 
rightKey, RIGHT_PARTITION, endPointComparator);
 
         // Run files for both branches
         runFileStream = new RunFileStream[JOIN_PARTITIONS];
@@ -143,9 +144,12 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         runFileStream[LEFT_PARTITION].close();
         runFileStream[RIGHT_PARTITION].close();
         if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("IntervalIndexJoiner statitics: " + 
joinComparisonCount + " comparisons, " + joinResultCount
-                    + " results, " + spillCount + " spills, " + 
spillWriteCount + " spill frames written, "
-                    + spillReadCount + " spill frames read.");
+            LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " 
comparisons, " + joinResultCount
+                    + " results, left[" + leftSpillCount + " spills, " + 
runFileStream[LEFT_PARTITION].getFileCount() + " files, "
+                    + runFileStream[LEFT_PARTITION].getWriteCount() + " 
written, " + runFileStream[LEFT_PARTITION].getReadCount()
+                    + " read]. right[" + rightSpillCount + " spills, " + 
runFileStream[RIGHT_PARTITION].getFileCount()
+                    + " files, " + 
runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+                    + runFileStream[RIGHT_PARTITION].getReadCount() + " 
read].");
         }
     }
 
@@ -237,11 +241,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         long leftStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
         long rightStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
         if (leftStart < rightStart) {
-            return activeManager[RIGHT_PARTITION].hasRecords()
-                    && activeManager[RIGHT_PARTITION].getTopPoint() < 
leftStart;
+            return activeManager[RIGHT_PARTITION].hasRecords() && 
activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
         } else {
-            return !(activeManager[LEFT_PARTITION].hasRecords()
-                    && activeManager[LEFT_PARTITION].getTopPoint() < 
rightStart);
+            return !(activeManager[LEFT_PARTITION].hasRecords() && 
activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
         }
     }
 
@@ -258,8 +260,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             if 
(checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION],
 leftKey), sweep)
                     || !imjc.checkToRemoveRightActive()) {
                 // Add individual tuples.
-                
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION],
-                        inputAccessor[LEFT_PARTITION], true, writer);
+                
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true,
+                        writer);
                 
runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]);
                 inputAccessor[LEFT_PARTITION].next();
                 ts = loadLeftTuple();
@@ -276,7 +278,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Memory is empty and we can start processing the run file.
         if (activeManager[RIGHT_PARTITION].isEmpty() || ts.isEmpty()) {
-            unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION], 
RIGHT_PARTITION);
+            unfreezeAndContinue(LEFT_PARTITION, inputAccessor[LEFT_PARTITION]);
             ts = loadLeftTuple();
         }
         return ts;
@@ -291,8 +293,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             if 
(checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION],
 rightKey), sweep)
                     || !imjc.checkToRemoveLeftActive()) {
                 // Add individual tuples.
-                
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION],
-                        inputAccessor[RIGHT_PARTITION], false, writer);
+                
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false,
+                        writer);
                 
runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]);
                 inputAccessor[RIGHT_PARTITION].next();
                 ts = loadRightTuple();
@@ -309,7 +311,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Memory is empty and we can start processing the run file.
         if (!activeManager[LEFT_PARTITION].hasRecords() || ts.isEmpty()) {
-            unfreezeAndContinue(RIGHT_PARTITION, 
inputAccessor[RIGHT_PARTITION], LEFT_PARTITION);
+            unfreezeAndContinue(RIGHT_PARTITION, 
inputAccessor[RIGHT_PARTITION]);
             ts = loadRightTuple();
         }
         return ts;
@@ -318,9 +320,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
     private void processLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
         // Process endpoints
         do {
-            if ((!activeManager[LEFT_PARTITION].hasRecords()
-                    || 
checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION],
 leftKey),
-                            activeManager[LEFT_PARTITION].getTopPoint()))
+            if ((!activeManager[LEFT_PARTITION].hasRecords() || 
checkToProcessAdd(
+                    
IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), 
activeManager[LEFT_PARTITION].getTopPoint()))
                     || !imjc.checkToRemoveLeftActive()) {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
@@ -340,8 +341,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Add Results
         if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
-                    memoryAccessor[LEFT_PARTITION], true, writer);
+            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION],
+                    true, writer);
         }
     }
 
@@ -370,8 +371,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Add Results
         if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
-                    memoryAccessor[RIGHT_PARTITION], false, writer);
+            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION],
+                    false, writer);
         }
     }
 
@@ -417,6 +418,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", 
Right memory tuples: "
                         + bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
             }
+            rightSpillCount++;
         } else {
             runFileStream[LEFT_PARTITION].startRunFile();
             if (LOGGER.isLoggable(Level.FINE)) {
@@ -424,7 +426,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", 
Right memory tuples: "
                         + bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
             }
+            leftSpillCount++;
         }
+        spillCount++;
     }
 
     private void continueStream(int diskPartition, ITupleAccessor accessor) 
throws HyracksDataException {
@@ -434,13 +438,27 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Continue with stream (" + diskPartition + ").");
         }
-        spillCount++;
-        spillReadCount += runFileStream[diskPartition].getReadCount();
-        spillWriteCount += runFileStream[diskPartition].getWriteCount();
     }
 
-    private void unfreezeAndContinue(int frozenPartition, ITupleAccessor 
accessor, int flushPartition)
-            throws HyracksDataException {
+    private void unfreezeAndContinue(int frozenPartition, ITupleAccessor 
accessor) throws HyracksDataException {
+        int flushPartition = frozenPartition == LEFT_PARTITION ? 
RIGHT_PARTITION : LEFT_PARTITION;
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("snapshot(" + frozenPartition + "): " + 
frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
+                    + " left, left[" + leftSpillCount + " spills, "
+                    + (runFileStream[LEFT_PARTITION].getFileCount() - 
spillFileCount[LEFT_PARTITION]) + " files, "
+                    + (runFileStream[LEFT_PARTITION].getWriteCount() - 
spillWriteCount[LEFT_PARTITION]) + " written, "
+                    + (runFileStream[LEFT_PARTITION].getReadCount() - 
spillReadCount[LEFT_PARTITION]) + " read]. right[" + rightSpillCount
+                    + " spills, " + 
(runFileStream[RIGHT_PARTITION].getFileCount() - 
spillFileCount[RIGHT_PARTITION]) + " files, "
+                    + (runFileStream[RIGHT_PARTITION].getWriteCount() - 
spillWriteCount[RIGHT_PARTITION]) + " written, "
+                    + (runFileStream[RIGHT_PARTITION].getReadCount() - 
spillReadCount[RIGHT_PARTITION]) + " read].");
+            spillFileCount[LEFT_PARTITION] = 
runFileStream[LEFT_PARTITION].getFileCount();
+            spillReadCount[LEFT_PARTITION] = 
runFileStream[LEFT_PARTITION].getReadCount();
+            spillWriteCount[LEFT_PARTITION] = 
runFileStream[LEFT_PARTITION].getWriteCount();
+            spillFileCount[RIGHT_PARTITION] = 
runFileStream[RIGHT_PARTITION].getFileCount();
+            spillReadCount[RIGHT_PARTITION] = 
runFileStream[RIGHT_PARTITION].getReadCount();
+            spillWriteCount[RIGHT_PARTITION] = 
runFileStream[RIGHT_PARTITION].getWriteCount();
+        }
+
         runFileStream[frozenPartition].flushAndStopRunFile(accessor);
         flushMemory(flushPartition);
         if ((LEFT_PARTITION == frozenPartition && 
!status.branch[LEFT_PARTITION].isRunFileReading())
@@ -453,4 +471,10 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         }
     }
 
+    @Override
+    public void closeInput(int partition) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
index 8006790..aa065cd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java
@@ -60,6 +60,7 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
 
     private final int partition;
     private final MergeJoinLocks locks;
+    protected long[] frameCounts = { 0, 0 };
 
     public AbstractMergeJoiner(IHyracksTaskContext ctx, int partition, 
MergeStatus status, MergeJoinLocks locks,
             RecordDescriptor leftRd, RecordDescriptor rightRd) throws 
HyracksDataException {
@@ -98,12 +99,13 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
         return TupleStatus.LOADED;
     }
 
-    protected TupleStatus loadMemoryTuple(int joinId) {
+    protected TupleStatus loadMemoryTuple(int branch) {
         TupleStatus loaded;
-        if (inputAccessor[joinId] != null && inputAccessor[joinId].exists()) {
+        if (inputAccessor[branch] != null && inputAccessor[branch].exists()) {
             // Still processing frame.
+            int test = inputAccessor[branch].getTupleCount();
             loaded = TupleStatus.LOADED;
-        } else if (status.branch[joinId].hasMore()) {
+        } else if (status.branch[branch].hasMore()) {
             loaded = TupleStatus.UNKNOWN;
         } else {
             // No more frames or tuples to process.
@@ -113,14 +115,14 @@ public abstract class AbstractMergeJoiner implements 
IMergeJoiner {
     }
 
     @Override
-    public void setFrame(int partition, ByteBuffer buffer) {
-        inputBuffer[partition].clear();
-        if (inputBuffer[partition].capacity() < buffer.capacity()) {
-            inputBuffer[partition].limit(buffer.capacity());
+    public void setFrame(int branch, ByteBuffer buffer) {
+        inputBuffer[branch].clear();
+        if (inputBuffer[branch].capacity() < buffer.capacity()) {
+            inputBuffer[branch].limit(buffer.capacity());
         }
-        inputBuffer[partition].put(buffer.array(), 0, buffer.capacity());
-        inputAccessor[partition].reset(inputBuffer[partition]);
-        inputAccessor[partition].next();
+        inputBuffer[branch].put(buffer.array(), 0, buffer.capacity());
+        inputAccessor[branch].reset(inputBuffer[branch]);
+        inputAccessor[branch].next();
+        frameCounts[branch]++;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
index 61ddde1..4268ec9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java
@@ -31,4 +31,6 @@ public interface IMergeJoiner {
 
     void setFrame(int partition, ByteBuffer buffer);
 
+    void closeInput(int partition) throws HyracksDataException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 5624bb5..cbe1a66 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -34,6 +34,7 @@ import 
org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.join.MergeBranchStatus.Stage;
 
 /**
  * The merge join is made up of two operators: left and right.
@@ -105,6 +106,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
             private final RecordDescriptor leftRd;
             private MergeJoinTaskState state;
             private boolean first = true;
+            int count = 0;
 
             public LeftJoinerOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc) {
                 this.ctx = ctx;
@@ -141,6 +143,8 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 locks.getLock(partition).lock();
+
+                count++;
                 if (first) {
                     state.status.branch[LEFT_ACTIVITY_ID].setStageData();
                     first = false;
@@ -171,6 +175,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
                     if (state.failed) {
                         writer.fail();
                     } else {
+                        state.joiner.closeInput(LEFT_ACTIVITY_ID);
                         state.joiner.processMergeUsingLeftTuple(writer);
                         state.joiner.closeResult(writer);
                         writer.close();
@@ -180,6 +185,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
                 } finally {
                     locks.getLock(partition).unlock();
                 }
+//                System.err.println("Left next calls: " + count);
             }
         }
     }
@@ -188,7 +194,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
         private static final long serialVersionUID = 1L;
 
         private final ActivityId joinAid;
-        private MergeJoinLocks locks;
+        private final MergeJoinLocks locks;
 
         public RightDataActivityNode(ActivityId id, ActivityId joinAid, 
MergeJoinLocks locks) {
             super(id);
@@ -202,8 +208,8 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
                 throws HyracksDataException {
             locks.setPartitions(nPartitions);
             RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IMergeJoinChecker mjc = 
mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
-                    partition, ctx);
+            final IMergeJoinChecker mjc = 
mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys, partition,
+                    ctx);
             return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
         }
 
@@ -215,6 +221,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
             private final IMergeJoinChecker mjc;
             private MergeJoinTaskState state;
             private boolean first = true;
+            int count = 0;
 
             public RightDataOperator(IHyracksTaskContext ctx, int partition, 
RecordDescriptor inRecordDesc,
                     IMergeJoinChecker mjc) {
@@ -250,12 +257,14 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 locks.getLock(partition).lock();
+                count++;
                 if (first) {
                     state.status.branch[RIGHT_ACTIVITY_ID].setStageData();
                     first = false;
                 }
                 try {
-                    while (!state.status.continueRightLoad && 
state.status.branch[LEFT_ACTIVITY_ID].hasMore()) {
+                    while (!state.status.continueRightLoad
+                            && 
state.status.branch[LEFT_ACTIVITY_ID].getStatus() != Stage.CLOSED) {
                         // Wait for the state to request right frame unless 
left has finished.
                         locks.getRight(partition).await();
                     }
@@ -289,6 +298,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
                 } finally {
                     locks.getLock(partition).unlock();
                 }
+//                System.err.println("Right next calls: " + count);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index 03283d3..c1a828f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -60,8 +60,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
-    private long spillWriteCount = 0;
-    private long spillReadCount = 0;
+//    private long spillFileCount = 0;
+//    private long spillWriteCount = 0;
+//    private long spillReadCount = 0;
     private long spillCount = 0;
 
     public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, 
MergeStatus status, MergeJoinLocks locks,
@@ -114,12 +115,14 @@ public class MergeJoiner extends AbstractMergeJoiner {
         resultAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.WARNING)) {
             LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " 
comparisons, " + joinResultCount
-                    + " results, " + spillCount + " spills, " + 
spillWriteCount + " spill frames written, "
-                    + spillReadCount + " spill frames read.");
+                    + " results, " + spillCount + " spills, " + 
runFileStream.getFileCount() + " files, "
+                    + runFileStream.getWriteCount() + " spill frames written, 
" + runFileStream.getReadCount()
+                    + " spill frames read.");
         }
     }
 
     private void flushMemory() throws HyracksDataException {
+        memoryBuffer.clear();
         bufferManager.reset();
     }
 
@@ -151,7 +154,11 @@ public class MergeJoiner extends AbstractMergeJoiner {
         if (status.branch[LEFT_PARTITION].isRunFileReading()) {
             loaded = loadSpilledTuple(LEFT_PARTITION);
             if (loaded.isEmpty()) {
-                continueStream(inputAccessor[LEFT_PARTITION]);
+                if (status.branch[LEFT_PARTITION].isRunFileWriting() && 
!status.branch[LEFT_PARTITION].hasMore()) {
+                    unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
+                } else {
+                    continueStream(inputAccessor[LEFT_PARTITION]);
+                }
                 loaded = loadLeftTuple();
             }
         } else {
@@ -169,6 +176,13 @@ public class MergeJoiner extends AbstractMergeJoiner {
         return TupleStatus.LOADED;
     }
 
+    @Override
+    public void closeInput(int partition) throws HyracksDataException {
+        if (status.branch[partition].isRunFileWriting()) {
+            unfreezeAndContinue(inputAccessor[partition]);
+        }
+    }
+
     /**
      * Left
      *
@@ -176,48 +190,54 @@ public class MergeJoiner extends AbstractMergeJoiner {
      */
     @Override
     public void processMergeUsingLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
-        TupleStatus ts = loadLeftTuple();
-        while (ts.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || 
memoryHasTuples())) {
+        TupleStatus leftTs = loadLeftTuple();
+        TupleStatus rightTs = loadRightTuple();
+        while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() 
|| memoryHasTuples())) {
             if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
                 // Left side from disk
-                processLeftTupleSpill(writer);
-                ts = loadLeftTuple();
-            } else if (loadRightTuple().isLoaded()
-                    && 
mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], 
inputAccessor[RIGHT_PARTITION])) {
+                leftTs = processLeftTupleSpill(writer);
+            } else if (rightTs.isLoaded() && 
mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], 
inputAccessor[RIGHT_PARTITION])) {
                 // Right side from stream
                 processRightTuple();
+                rightTs = loadRightTuple();
             } else {
                 // Left side from stream
                 processLeftTuple(writer);
-                ts = loadLeftTuple();
+                leftTs = loadLeftTuple();
             }
         }
     }
 
-    private void processLeftTupleSpill(IFrameWriter writer) throws 
HyracksDataException {
+    private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws 
HyracksDataException {
+        //        System.err.print("Spill ");
+
         runFileStream.addToRunFile(inputAccessor[LEFT_PARTITION]);
         processLeftTuple(writer);
+
         // Memory is empty and we can start processing the run file.
         if (!memoryHasTuples() && 
status.branch[LEFT_PARTITION].isRunFileWriting()) {
             unfreezeAndContinue(inputAccessor[LEFT_PARTITION]);
         }
+        return loadLeftTuple();
     }
 
     private void processLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
+        //        TuplePrinterUtil.printTuple("Left", inputAccessor[LEFT]);
         // Check against memory (right)
         if (memoryHasTuples()) {
             for (int i = memoryBuffer.size() - 1; i > -1; --i) {
                 memoryAccessor.reset(memoryBuffer.get(i));
-                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
-                        memoryAccessor, memoryBuffer.get(i).getTupleIndex(), 
false)) {
+                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+                        memoryBuffer.get(i).getTupleIndex(), false)) {
                     // add to result
-                    addToResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
-                            memoryAccessor, 
memoryBuffer.get(i).getTupleIndex(), writer);
+                    addToResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+                            memoryBuffer.get(i).getTupleIndex(), writer);
                 }
                 joinComparisonCount++;
-                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
-                        memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
+                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
+                        memoryBuffer.get(i).getTupleIndex())) {
                     // remove from memory
+                    //                    TuplePrinterUtil.printTuple("Remove 
Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex());
                     removeFromMemory(memoryBuffer.get(i));
                 }
             }
@@ -234,30 +254,45 @@ public class MergeJoiner extends AbstractMergeJoiner {
                 return;
             }
         }
+        //        TuplePrinterUtil.printTuple("Memory", inputAccessor[RIGHT]);
         inputAccessor[RIGHT_PARTITION].next();
     }
 
     private void freezeAndSpill() throws HyracksDataException {
+//        System.err.println("freezeAndSpill");
+
         runFileStream.startRunFile();
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
                     "Memory is full. Freezing the right branch. (memory 
tuples: " + bufferManager.getNumTuples() + ")");
         }
+        spillCount++;
     }
 
     private void continueStream(ITupleAccessor accessor) throws 
HyracksDataException {
+//        System.err.println("continueStream");
+
         runFileStream.closeRunFile();
         accessor.reset(inputBuffer[LEFT_PARTITION]);
         accessor.setTupleId(leftStreamIndex);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Continue with left stream.");
         }
-        spillCount++;
-        spillReadCount += runFileStream.getReadCount();
-        spillWriteCount += runFileStream.getWriteCount();
     }
 
     private void unfreezeAndContinue(ITupleAccessor accessor) throws 
HyracksDataException {
+//        System.err.println("unfreezeAndContinue");
+//        if (LOGGER.isLoggable(Level.WARNING)) {
+//            LOGGER.warning("snapshot: " + frameCounts[RIGHT] + " right, " + 
frameCounts[LEFT] + " left, "
+//                    + joinComparisonCount + " comparisons, " + 
joinResultCount + " results, " + spillCount + " spills, "
+//                    + (runFileStream.getFileCount() - spillFileCount) + " 
files, "
+//                    + (runFileStream.getWriteCount() - spillWriteCount) + " 
spill frames written, "
+//                    + (runFileStream.getReadCount() - spillReadCount) + " 
spill frames read.");
+//            spillFileCount = runFileStream.getFileCount();
+//            spillReadCount = runFileStream.getReadCount();
+//            spillWriteCount = runFileStream.getWriteCount();
+//        }
+
         runFileStream.flushAndStopRunFile(accessor);
         flushMemory();
         if (!status.branch[LEFT_PARTITION].isRunFileReading()) {
@@ -267,6 +302,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Unfreezing right partition.");
         }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 3d99d6c..5cc36ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -62,6 +62,7 @@ public class NestedLoopJoin {
     private boolean isReversed; //Added for handling correct calling for 
predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that 
cause role-reversal
     private BufferInfo tempInfo = new BufferInfo(null, -1, -1);
 
+    private final int partition;
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
     private long spillWriteCount = 0;
@@ -104,6 +105,8 @@ public class NestedLoopJoin {
                 .createManagedWorkspaceFile(this.getClass().getSimpleName() + 
this.toString());
         runFileWriter = new RunFileWriter(file, ctx.getIOManager());
         runFileWriter.open();
+
+        partition = ctx.getTaskAttemptId().getTaskId().getPartition();
     }
 
     public void cache(ByteBuffer buffer) throws HyracksDataException {
@@ -202,8 +205,9 @@ public class NestedLoopJoin {
         appender.write(writer, true);
 
         if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("NestedLoopJoin statitics: " + joinComparisonCount 
+ " comparisons, " + joinResultCount
-                    + " results, " + spillWriteCount + " frames written, " + 
spillReadCount + " frames read.");
+            LOGGER.warning("NestedLoopJoin statitics: " + partition + " 
partition, " + joinComparisonCount
+                    + " comparisons, " + joinResultCount + " results, " + 
spillWriteCount + " frames written, "
+                    + spillReadCount + " frames read.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1df9a9c7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index aaaaaf4..042b85e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -48,6 +48,7 @@ public class RunFileStream {
     private long runFileCounter = 0;
     private long readCount = 0;
     private long writeCount = 0;
+    private long tupleCount = 0;
 
     public RunFileStream(IHyracksTaskContext ctx, String key, 
IRunFileStreamStatus status) throws HyracksDataException {
         this.ctx = ctx;
@@ -58,6 +59,10 @@ public class RunFileStream {
         runFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
     }
 
+    public long getFileCount() {
+        return runFileCounter;
+    }
+
     public long getReadCount() {
         return readCount;
     }
@@ -67,8 +72,6 @@ public class RunFileStream {
     }
 
     public void startRunFile() throws HyracksDataException {
-        readCount = 0;
-        writeCount = 0;
         runFileCounter++;
 
         status.setRunFileWriting(true);
@@ -89,7 +92,9 @@ public class RunFileStream {
             runFileAppender.write(runFileWriter, true);
             writeCount++;
             runFileAppender.append(accessor, idx);
+            tupleCount = 0;
         }
+        tupleCount++;
     }
 
     public void openRunFile(ITupleAccessor accessor) throws 
HyracksDataException {

Reply via email to