snapshot - reuse frames in deletable partition manager.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/6c312141
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/6c312141
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/6c312141

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 6c3121413762f3556d884e16642cefa654e42868
Parents: f186da9
Author: Preston Carman <prest...@apache.org>
Authored: Tue Sep 13 19:37:32 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Tue Sep 13 19:37:32 2016 -0700

----------------------------------------------------------------------
 .../operators/joins/IntervalJoinUtil.java       |  1 +
 .../intervalindex/IntervalIndexJoiner.java      | 70 ++++++++++++--------
 .../InMemoryIntervalPartitionJoin.java          |  5 +-
 .../std/buffermanager/IFrameBufferManager.java  | 24 ++++++-
 ...IPartitionedDeletableTupleBufferManager.java |  2 +
 .../VPartitionDeletableTupleBufferManager.java  | 50 ++++++++++++--
 .../VPartitionTupleBufferManager.java           | 57 ++++++++++++++--
 .../VariableFrameMemoryManager.java             | 23 +++++++
 .../hyracks/dataflow/std/join/MergeJoiner.java  | 53 ++++++++-------
 .../dataflow/std/join/RunFileStream.java        |  1 +
 .../sort/util/DeletableFrameTupleAppender.java  | 26 +++++++-
 .../IAppendDeletableFrameTupleAccessor.java     |  3 +
 12 files changed, 249 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
index 3e380c7..7c37f08 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IntervalJoinUtil.java
@@ -48,6 +48,7 @@ public class IntervalJoinUtil {
     }
 
     public static long getIntervalStart(IFrameTupleAccessor accessor, int 
tupleId, int fieldId) {
+        int length = accessor.getTupleLength(tupleId);
         int start = accessor.getTupleStartOffset(tupleId) + 
accessor.getFieldSlotsLength()
                 + accessor.getFieldStartOffset(tupleId, fieldId) + 1;
         long intervalStart = 
AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), 
start);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 965411b..d3aaa65 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -73,7 +73,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
 
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
-    private long spillCount = 0;
     private long leftSpillCount = 0;
     private long rightSpillCount = 0;
     private long[] spillFileCount = { 0, 0 };
@@ -104,16 +103,21 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             throw new HyracksDataException(
                     "IntervalIndexJoiner does not have enough memory (needs > 
4, got " + memorySize + ").");
         }
+        //        bufferManager = new 
VPartitionDeletableTupleBufferManager(ctx,
+        //                VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, 
JOIN_PARTITIONS,
+        //                (memorySize - 4) * ctx.getInitialFrameSize(), 
recordDescriptors);
         bufferManager = new VPartitionDeletableTupleBufferManager(ctx,
                 VPartitionDeletableTupleBufferManager.NO_CONSTRAIN, 
JOIN_PARTITIONS,
-                (memorySize - 4) * ctx.getInitialFrameSize(), 
recordDescriptors);
+                memorySize * ctx.getInitialFrameSize(), recordDescriptors);
         memoryAccessor = new ITuplePointerAccessor[JOIN_PARTITIONS];
         memoryAccessor[LEFT_PARTITION] = 
bufferManager.getTuplePointerAccessor(leftRd);
         memoryAccessor[RIGHT_PARTITION] = 
bufferManager.getTuplePointerAccessor(rightRd);
 
         activeManager = new ActiveSweepManager[JOIN_PARTITIONS];
-        activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, 
leftKey, LEFT_PARTITION, endPointComparator);
-        activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, 
rightKey, RIGHT_PARTITION, endPointComparator);
+        activeManager[LEFT_PARTITION] = new ActiveSweepManager(bufferManager, 
leftKey, LEFT_PARTITION,
+                endPointComparator);
+        activeManager[RIGHT_PARTITION] = new ActiveSweepManager(bufferManager, 
rightKey, RIGHT_PARTITION,
+                endPointComparator);
 
         // Run files for both branches
         runFileStream = new RunFileStream[JOIN_PARTITIONS];
@@ -144,11 +148,12 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         runFileStream[LEFT_PARTITION].close();
         runFileStream[RIGHT_PARTITION].close();
         if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " 
comparisons, " + joinResultCount
-                    + " results, left[" + leftSpillCount + " spills, " + 
runFileStream[LEFT_PARTITION].getFileCount() + " files, "
-                    + runFileStream[LEFT_PARTITION].getWriteCount() + " 
written, " + runFileStream[LEFT_PARTITION].getReadCount()
-                    + " read]. right[" + rightSpillCount + " spills, " + 
runFileStream[RIGHT_PARTITION].getFileCount()
-                    + " files, " + 
runFileStream[RIGHT_PARTITION].getWriteCount() + " written, "
+            LOGGER.warning("IntervalIndexJoiner statitics: " + 
joinComparisonCount + " comparisons, " + joinResultCount
+                    + " results, left[" + leftSpillCount + " spills, " + 
runFileStream[LEFT_PARTITION].getFileCount()
+                    + " files, " + 
runFileStream[LEFT_PARTITION].getWriteCount() + " written, "
+                    + runFileStream[LEFT_PARTITION].getReadCount() + " read]. 
right[" + rightSpillCount + " spills, "
+                    + runFileStream[RIGHT_PARTITION].getFileCount() + " files, 
"
+                    + runFileStream[RIGHT_PARTITION].getWriteCount() + " 
written, "
                     + runFileStream[RIGHT_PARTITION].getReadCount() + " 
read].");
         }
     }
@@ -241,14 +246,16 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         long leftStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
         long rightStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
         if (leftStart < rightStart) {
-            return activeManager[RIGHT_PARTITION].hasRecords() && 
activeManager[RIGHT_PARTITION].getTopPoint() < leftStart;
+            return activeManager[RIGHT_PARTITION].hasRecords()
+                    && activeManager[RIGHT_PARTITION].getTopPoint() < 
leftStart;
         } else {
-            return !(activeManager[LEFT_PARTITION].hasRecords() && 
activeManager[LEFT_PARTITION].getTopPoint() < rightStart);
+            return !(activeManager[LEFT_PARTITION].hasRecords()
+                    && activeManager[LEFT_PARTITION].getTopPoint() < 
rightStart);
         }
     }
 
     private boolean checkToProcessAdd(long startMemory, long endMemory) {
-        return startMemory <= endMemory;
+        return startMemory < endMemory;
     }
 
     private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws 
HyracksDataException {
@@ -260,8 +267,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             if 
(checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION],
 leftKey), sweep)
                     || !imjc.checkToRemoveRightActive()) {
                 // Add individual tuples.
-                
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], inputAccessor[LEFT_PARTITION], true,
-                        writer);
+                
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION],
+                        inputAccessor[LEFT_PARTITION], true, writer);
                 
runFileStream[LEFT_PARTITION].addToRunFile(inputAccessor[LEFT_PARTITION]);
                 inputAccessor[LEFT_PARTITION].next();
                 ts = loadLeftTuple();
@@ -293,8 +300,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             if 
(checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION],
 rightKey), sweep)
                     || !imjc.checkToRemoveLeftActive()) {
                 // Add individual tuples.
-                
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], inputAccessor[RIGHT_PARTITION], false,
-                        writer);
+                
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION],
+                        inputAccessor[RIGHT_PARTITION], false, writer);
                 
runFileStream[RIGHT_PARTITION].addToRunFile(inputAccessor[RIGHT_PARTITION]);
                 inputAccessor[RIGHT_PARTITION].next();
                 ts = loadRightTuple();
@@ -320,8 +327,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
     private void processLeftTuple(IFrameWriter writer) throws 
HyracksDataException {
         // Process endpoints
         do {
-            if ((!activeManager[LEFT_PARTITION].hasRecords() || 
checkToProcessAdd(
-                    
IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey), 
activeManager[LEFT_PARTITION].getTopPoint()))
+            if ((!activeManager[LEFT_PARTITION].hasRecords()
+                    || 
checkToProcessAdd(IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION],
 leftKey),
+                            activeManager[LEFT_PARTITION].getTopPoint()))
                     || !imjc.checkToRemoveLeftActive()) {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
@@ -341,8 +349,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Add Results
         if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer, memoryAccessor[LEFT_PARTITION],
-                    true, writer);
+            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
+                    memoryAccessor[LEFT_PARTITION], true, writer);
         }
     }
 
@@ -371,8 +379,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
         // Add Results
         if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer, memoryAccessor[RIGHT_PARTITION],
-                    false, writer);
+            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
+                    memoryAccessor[RIGHT_PARTITION], false, writer);
         }
     }
 
@@ -411,6 +419,11 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
     }
 
     private void freezeAndSpill() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] 
+ " right, " + frameCounts[LEFT_PARTITION]
+                    + " left, left[" + 
bufferManager.getNumTuples(LEFT_PARTITION) + " memory]. right["
+                    + bufferManager.getNumTuples(RIGHT_PARTITION) + " 
memory].");
+        }
         if (bufferManager.getNumTuples(LEFT_PARTITION) > 
bufferManager.getNumTuples(RIGHT_PARTITION)) {
             runFileStream[RIGHT_PARTITION].startRunFile();
             if (LOGGER.isLoggable(Level.FINE)) {
@@ -418,6 +431,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", 
Right memory tuples: "
                         + bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
             }
+            bufferManager.printStats("memory details");
             rightSpillCount++;
         } else {
             runFileStream[LEFT_PARTITION].startRunFile();
@@ -426,9 +440,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                         + bufferManager.getNumTuples(LEFT_PARTITION) + ", 
Right memory tuples: "
                         + bufferManager.getNumTuples(RIGHT_PARTITION) + ")");
             }
+            bufferManager.printStats("memory details");
             leftSpillCount++;
         }
-        spillCount++;
     }
 
     private void continueStream(int diskPartition, ITupleAccessor accessor) 
throws HyracksDataException {
@@ -443,12 +457,14 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
     private void unfreezeAndContinue(int frozenPartition, ITupleAccessor 
accessor) throws HyracksDataException {
         int flushPartition = frozenPartition == LEFT_PARTITION ? 
RIGHT_PARTITION : LEFT_PARTITION;
         if (LOGGER.isLoggable(Level.WARNING)) {
-            LOGGER.warning("snapshot(" + frozenPartition + "): " + 
frameCounts[RIGHT_PARTITION] + " right, " + frameCounts[LEFT_PARTITION]
-                    + " left, left[" + leftSpillCount + " spills, "
+            LOGGER.warning("snapshot(" + frozenPartition + "): " + 
frameCounts[RIGHT_PARTITION] + " right, "
+                    + frameCounts[LEFT_PARTITION] + " left, left[" + 
bufferManager.getNumTuples(LEFT_PARTITION)
+                    + " memory, " + leftSpillCount + " spills, "
                     + (runFileStream[LEFT_PARTITION].getFileCount() - 
spillFileCount[LEFT_PARTITION]) + " files, "
                     + (runFileStream[LEFT_PARTITION].getWriteCount() - 
spillWriteCount[LEFT_PARTITION]) + " written, "
-                    + (runFileStream[LEFT_PARTITION].getReadCount() - 
spillReadCount[LEFT_PARTITION]) + " read]. right[" + rightSpillCount
-                    + " spills, " + 
(runFileStream[RIGHT_PARTITION].getFileCount() - 
spillFileCount[RIGHT_PARTITION]) + " files, "
+                    + (runFileStream[LEFT_PARTITION].getReadCount() - 
spillReadCount[LEFT_PARTITION]) + " read]. right["
+                    + bufferManager.getNumTuples(RIGHT_PARTITION) + " memory, 
" + +rightSpillCount + " spills, "
+                    + (runFileStream[RIGHT_PARTITION].getFileCount() - 
spillFileCount[RIGHT_PARTITION]) + " files, "
                     + (runFileStream[RIGHT_PARTITION].getWriteCount() - 
spillWriteCount[RIGHT_PARTITION]) + " written, "
                     + (runFileStream[RIGHT_PARTITION].getReadCount() - 
spillReadCount[RIGHT_PARTITION]) + " read].");
             spillFileCount[LEFT_PARTITION] = 
runFileStream[LEFT_PARTITION].getFileCount();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
index 88ff727..aeea209 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/InMemoryIntervalPartitionJoin.java
@@ -69,7 +69,9 @@ public class InMemoryIntervalPartitionJoin {
     public void join(IFrameTupleAccessor accessorProbe, int probeTupleIndex, 
IFrameWriter writer)
             throws HyracksDataException {
         if (fbm.getNumFrames() != 0) {
-            for (int frameIndex = 0; frameIndex < fbm.getNumFrames(); 
++frameIndex) {
+            fbm.resetIterator();
+            int frameIndex = fbm.next();
+            while (fbm.exists()) {
                 fbm.getFrame(frameIndex, bufferInfo);
                 accessorBuild.reset(bufferInfo.getBuffer());
                 for (int buildTupleIndex = 0; buildTupleIndex < 
accessorBuild.getTupleCount(); ++buildTupleIndex) {
@@ -79,6 +81,7 @@ public class InMemoryIntervalPartitionJoin {
                     }
                     joinComparisonCount++;
                 }
+                frameIndex = fbm.next();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
index 1118bf3..5741071 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IFrameBufferManager.java
@@ -36,7 +36,8 @@ public interface IFrameBufferManager {
 
     /**
      * @param frameIndex
-     * @param bufferInfo the given object need to be reset
+     * @param bufferInfo
+     *            the given object need to be reset
      * @return the filled bufferInfo to facilitate the chain access
      */
     BufferInfo getFrame(int frameIndex, BufferInfo bufferInfo);
@@ -49,11 +50,30 @@ public interface IFrameBufferManager {
     /**
      * Writes the whole frame into the buffer.
      *
-     * @param frame source frame
+     * @param frame
+     *            source frame
      * @return the id of the inserted frame. return -1 if it failed to insert
      */
     int insertFrame(ByteBuffer frame) throws HyracksDataException;
 
+    /**
+     * Removes the frame from the buffer manager
+     *
+     * @param frameIndex
+     */
+    void removeFrame(int frameIndex);
+
     void close();
 
+    /**
+     * Create a iterator for frames.
+     *
+     * Allows the reuse of frame ids.
+     */
+    int next();
+
+    boolean exists();
+
+    void resetIterator();
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
index df98e88..52a0918 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedDeletableTupleBufferManager.java
@@ -26,4 +26,6 @@ public interface IPartitionedDeletableTupleBufferManager 
extends IPartitionedTup
 
     void deleteTuple(int partition, TuplePointer tuplePointer) throws 
HyracksDataException;
 
+    void printStats(String string);
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
index 1e1127d..6802e19 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionDeletableTupleBufferManager.java
@@ -34,7 +34,7 @@ import 
org.apache.hyracks.dataflow.std.structures.TuplePointer;
 public class VPartitionDeletableTupleBufferManager extends 
VPartitionTupleBufferManager
         implements IPartitionedDeletableTupleBufferManager {
 
-    private static int[] minFreeSpace;
+    private final int[] minFreeSpace;
     private final IAppendDeletableFrameTupleAccessor[] accessor;
     private final IFrameFreeSlotPolicy[] policy;
 
@@ -85,8 +85,11 @@ public class VPartitionDeletableTupleBufferManager extends 
VPartitionTupleBuffer
     public void clearPartition(int partitionId) throws HyracksDataException {
         IFrameBufferManager partition = partitionArray[partitionId];
         if (partition != null) {
-            for (int i = 0; i < partition.getNumFrames(); ++i) {
+            partition.resetIterator();
+            int i = partition.next();
+            while (partition.exists()) {
                 accessor[partitionId].clear(partition.getFrame(i, 
tempInfo).getBuffer());
+                i = partition.next();
             }
         }
         policy[partitionId].reset();
@@ -94,22 +97,36 @@ public class VPartitionDeletableTupleBufferManager extends 
VPartitionTupleBuffer
     }
 
     private void reOrganizeFrames(int partition) {
+        System.err.printf("reOrganizeFrames -- %d:[", partition);
         policy[partition].reset();
-        for (int i = 0; i < partitionArray[partition].getNumFrames(); i++) {
-            partitionArray[partition].getFrame(i, tempInfo);
+        partitionArray[partition].resetIterator();
+        int f = partitionArray[partition].next();
+        while (partitionArray[partition].exists()) {
+            partitionArray[partition].getFrame(f, tempInfo);
             accessor[partition].reset(tempInfo.getBuffer());
             accessor[partition].reOrganizeBuffer();
-            policy[partition].pushNewFrame(i, 
accessor[partition].getContiguousFreeSpace());
+            if (accessor[partition].getTupleCount() == 0) {
+                partitionArray[partition].removeFrame(f);
+                framePool.deAllocateBuffer(tempInfo.getBuffer());
+            } else {
+                policy[partition].pushNewFrame(f, 
accessor[partition].getContiguousFreeSpace());
+                accessor[partition].printStats(System.err);
+            }
+            f = partitionArray[partition].next();
         }
+        System.err.println("] ");
     }
 
     private boolean canBeInsertedAfterCleanUpFragmentation(int partition, int 
requiredFreeSpace) {
-        for (int i = 0; i < partitionArray[partition].getNumFrames(); i++) {
+        partitionArray[partition].resetIterator();
+        int i = partitionArray[partition].next();
+        while (partitionArray[partition].exists()) {
             partitionArray[partition].getFrame(i, tempInfo);
             accessor[partition].reset(tempInfo.getBuffer());
             if (accessor[partition].getTotalFreeSpace() >= requiredFreeSpace) {
                 return true;
             }
+            i = partitionArray[partition].next();
         }
         return false;
     }
@@ -135,6 +152,27 @@ public class VPartitionDeletableTupleBufferManager extends 
VPartitionTupleBuffer
         return recordDescriptor.getFieldCount() * 4 + 4;
     }
 
+    public void printStats(String message) {
+        System.err.print(String.format("%1$-" + 15 + "s", message) + " --");
+
+        for (int p = 0; p < partitionArray.length; ++p) {
+            System.err.printf("%d:[", p);
+            IFrameBufferManager partition = partitionArray[p];
+            if (partition != null) {
+                partitionArray[p].resetIterator();
+                int f = partitionArray[p].next();
+                while (partitionArray[p].exists()) {
+                    partitionArray[p].getFrame(f, tempInfo);
+                    accessor[p].reset(tempInfo.getBuffer());
+                    accessor[p].printStats(System.err);
+                    f = partitionArray[p].next();
+                }
+            }
+            System.err.printf("] ");
+        }
+        System.err.println();
+    }
+
     @Override
     public void deleteTuple(int partition, TuplePointer tuplePointer) throws 
HyracksDataException {
         partitionArray[parsePartitionId(tuplePointer.getFrameIndex())]

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4a4cb5d..a094061 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -110,8 +110,11 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
     public void clearPartition(int partitionId) throws HyracksDataException {
         IFrameBufferManager partition = partitionArray[partitionId];
         if (partition != null) {
-            for (int i = 0; i < partition.getNumFrames(); ++i) {
+            partition.resetIterator();
+            int i = partition.next();
+            while (partition.exists()) {
                 framePool.deAllocateBuffer(partition.getFrame(i, 
tempInfo).getBuffer());
+                i = partition.next();
             }
             partition.reset();
         }
@@ -221,11 +224,13 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
 
     static class PartitionFrameBufferManager implements IFrameBufferManager {
 
+        int size = 0;
         ArrayList<ByteBuffer> buffers = new ArrayList<>();
 
         @Override
         public void reset() throws HyracksDataException {
             buffers.clear();
+            size = 0;
         }
 
         @Override
@@ -236,13 +241,35 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
 
         @Override
         public int getNumFrames() {
-            return buffers.size();
+            return size;
         }
 
         @Override
         public int insertFrame(ByteBuffer frame) throws HyracksDataException {
-            buffers.add(frame);
-            return buffers.size() - 1;
+            int index = -1;
+            if (buffers.size() == size) {
+                buffers.add(frame);
+                index = buffers.size() - 1;
+            } else {
+                for (int i = 0; i < buffers.size(); ++i) {
+                    if (buffers.get(i) == null) {
+                        buffers.set(i, frame);
+                        index = i;
+                        break;
+                    }
+                }
+            }
+            if (index == -1) {
+                throw new HyracksDataException("Did not insert frame.");
+            }
+            size++;
+            return index;
+        }
+
+        @Override
+        public void removeFrame(int frameIndex) {
+            buffers.set(frameIndex, null);
+            size--;
         }
 
         @Override
@@ -250,6 +277,28 @@ public class VPartitionTupleBufferManager implements 
IPartitionedTupleBufferMana
             buffers = null;
         }
 
+        int iterator = -1;
+
+        @Override
+        public int next() {
+            while (++iterator < buffers.size()) {
+                if (buffers.get(iterator) != null) {
+                    break;
+                }
+            }
+            return iterator;
+        }
+
+        @Override
+        public boolean exists() {
+            return iterator < buffers.size() && buffers.get(iterator) != null;
+        }
+
+        @Override
+        public void resetIterator() {
+            iterator = -1;
+        }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
index 6604ba8..7c750d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableFrameMemoryManager.java
@@ -104,10 +104,33 @@ public class VariableFrameMemoryManager implements 
IFrameBufferManager {
     }
 
     @Override
+    public void removeFrame(int frameIndex)  {
+        logicalFrameStartSizes.remove(frameIndex);
+    }
+
+    @Override
     public void close() {
         physicalFrameOffsets.clear();
         logicalFrameStartSizes.clear();
         freeSlotPolicy.reset();
         framePool.close();
     }
+
+    @Override
+    public int next() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public boolean exists() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void resetIterator() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
index c1a828f..9c93828 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java
@@ -60,9 +60,9 @@ public class MergeJoiner extends AbstractMergeJoiner {
 
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
-//    private long spillFileCount = 0;
-//    private long spillWriteCount = 0;
-//    private long spillReadCount = 0;
+    private long spillFileCount = 0;
+    private long spillWriteCount = 0;
+    private long spillReadCount = 0;
     private long spillCount = 0;
 
     public MergeJoiner(IHyracksTaskContext ctx, int memorySize, int partition, 
MergeStatus status, MergeJoinLocks locks,
@@ -196,7 +196,8 @@ public class MergeJoiner extends AbstractMergeJoiner {
             if (status.branch[LEFT_PARTITION].isRunFileWriting()) {
                 // Left side from disk
                 leftTs = processLeftTupleSpill(writer);
-            } else if (rightTs.isLoaded() && 
mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], 
inputAccessor[RIGHT_PARTITION])) {
+            } else if (rightTs.isLoaded()
+                    && 
mjc.checkToLoadNextRightTuple(inputAccessor[LEFT_PARTITION], 
inputAccessor[RIGHT_PARTITION])) {
                 // Right side from stream
                 processRightTuple();
                 rightTs = loadRightTuple();
@@ -227,15 +228,15 @@ public class MergeJoiner extends AbstractMergeJoiner {
         if (memoryHasTuples()) {
             for (int i = memoryBuffer.size() - 1; i > -1; --i) {
                 memoryAccessor.reset(memoryBuffer.get(i));
-                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
-                        memoryBuffer.get(i).getTupleIndex(), false)) {
+                if (mjc.checkToSaveInResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex(), 
false)) {
                     // add to result
-                    addToResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
-                            memoryBuffer.get(i).getTupleIndex(), writer);
+                    addToResult(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                            memoryAccessor, 
memoryBuffer.get(i).getTupleIndex(), writer);
                 }
                 joinComparisonCount++;
-                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(), memoryAccessor,
-                        memoryBuffer.get(i).getTupleIndex())) {
+                if (mjc.checkToRemoveInMemory(inputAccessor[LEFT_PARTITION], 
inputAccessor[LEFT_PARTITION].getTupleId(),
+                        memoryAccessor, memoryBuffer.get(i).getTupleIndex())) {
                     // remove from memory
                     //                    TuplePrinterUtil.printTuple("Remove 
Memory", memoryAccessor, memoryBuffer.get(i).getTupleIndex());
                     removeFromMemory(memoryBuffer.get(i));
@@ -259,7 +260,12 @@ public class MergeJoiner extends AbstractMergeJoiner {
     }
 
     private void freezeAndSpill() throws HyracksDataException {
-//        System.err.println("freezeAndSpill");
+        //        System.err.println("freezeAndSpill");
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("freeze snapshot: " + frameCounts[RIGHT_PARTITION] 
+ " right, " + frameCounts[LEFT_PARTITION]
+                    + " left, " + joinComparisonCount + " comparisons, " + 
joinResultCount + " results, ["
+                    + bufferManager.getNumTuples() + " tuples memory].");
+        }
 
         runFileStream.startRunFile();
         if (LOGGER.isLoggable(Level.FINE)) {
@@ -270,7 +276,7 @@ public class MergeJoiner extends AbstractMergeJoiner {
     }
 
     private void continueStream(ITupleAccessor accessor) throws 
HyracksDataException {
-//        System.err.println("continueStream");
+        //        System.err.println("continueStream");
 
         runFileStream.closeRunFile();
         accessor.reset(inputBuffer[LEFT_PARTITION]);
@@ -281,17 +287,18 @@ public class MergeJoiner extends AbstractMergeJoiner {
     }
 
     private void unfreezeAndContinue(ITupleAccessor accessor) throws 
HyracksDataException {
-//        System.err.println("unfreezeAndContinue");
-//        if (LOGGER.isLoggable(Level.WARNING)) {
-//            LOGGER.warning("snapshot: " + frameCounts[RIGHT] + " right, " + 
frameCounts[LEFT] + " left, "
-//                    + joinComparisonCount + " comparisons, " + 
joinResultCount + " results, " + spillCount + " spills, "
-//                    + (runFileStream.getFileCount() - spillFileCount) + " 
files, "
-//                    + (runFileStream.getWriteCount() - spillWriteCount) + " 
spill frames written, "
-//                    + (runFileStream.getReadCount() - spillReadCount) + " 
spill frames read.");
-//            spillFileCount = runFileStream.getFileCount();
-//            spillReadCount = runFileStream.getReadCount();
-//            spillWriteCount = runFileStream.getWriteCount();
-//        }
+        //        System.err.println("unfreezeAndContinue");
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.warning("snapshot: " + frameCounts[RIGHT_PARTITION] + " 
right, " + frameCounts[LEFT_PARTITION]
+                    + " left, " + joinComparisonCount + " comparisons, " + 
joinResultCount + " results, ["
+                    + bufferManager.getNumTuples() + " tuples memory, " + 
spillCount + " spills, "
+                    + (runFileStream.getFileCount() - spillFileCount) + " 
files, "
+                    + (runFileStream.getWriteCount() - spillWriteCount) + " 
written, "
+                    + (runFileStream.getReadCount() - spillReadCount) + " 
read].");
+            spillFileCount = runFileStream.getFileCount();
+            spillReadCount = runFileStream.getReadCount();
+            spillWriteCount = runFileStream.getWriteCount();
+        }
 
         runFileStream.flushAndStopRunFile(accessor);
         flushMemory();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 042b85e..2513b1b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -145,6 +145,7 @@ public class RunFileStream {
                     loadNextBuffer(accessor);
                 }
             }
+            runFileReader.close();
         }
 
         // Flush buffer.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 7d4db64..79aba3e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.dataflow.std.sort.util;
 
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
@@ -42,7 +43,7 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
     private int tupleCount;
     private int freeDataEndOffset;
     private int deletedSpace;
-    private byte[] array;   // to speed up the array visit a little
+    private byte[] array; // to speed up the array visit a little
 
     public DeletableFrameTupleAppender(RecordDescriptor recordDescriptor) {
         this.recordDescriptor = recordDescriptor;
@@ -146,7 +147,7 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
             endOffset = getTupleEndOffset(i);
             if (endOffset >= 0) {
                 int length = endOffset - startOffset;
-                assert ( length >= 0);
+                assert length >= 0;
                 if (freeDataEndOffset != startOffset) {
                     System.arraycopy(array, startOffset, array, 
freeDataEndOffset, length);
                 }
@@ -162,7 +163,7 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
     private void reclaimDeletedEnding() {
         for (int i = tupleCount - 1; i >= 0; i--) {
             int endOffset = getTupleEndOffset(i);
-            if (endOffset < 0) {
+            if (endOffset <= 0) {
                 tupleCount--;
             } else {
                 break;
@@ -240,9 +241,28 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         return tupleCount;
     }
 
+    private int getLiveTupleCount() {
+        int live = 0;
+        for (int i = tupleCount - 1; i >= 0; i--) {
+            int endOffset = getTupleEndOffset(i);
+            if (endOffset > 0) {
+                live++;
+            }
+        }
+        return live;
+    }
+
     @Override
     public ByteBuffer getBuffer() {
         return buffer;
     }
 
+    @Override
+    public void printStats(PrintStream ps) {
+        if (getLiveTupleCount() == 0) {
+            ps.print("");
+        }
+        ps.printf("(%d, %d)", getLiveTupleCount(), getPhysicalTupleCount());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/6c312141/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
index 31ea07d..e7d1ceb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/IAppendDeletableFrameTupleAccessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.hyracks.dataflow.std.sort.util;
 
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -73,4 +74,6 @@ public interface IAppendDeletableFrameTupleAccessor extends 
IFrameTupleAccessor
      * @return how many contiguous free space in the buffer.
      */
     int getContiguousFreeSpace();
+
+    void printStats(PrintStream ps);
 }

Reply via email to