Snapshot after workign frame tuple appender.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/23eab43d
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/23eab43d
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/23eab43d

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 23eab43dda9860030c2065343208a05dcc437486
Parents: fd514a0
Author: Preston Carman <prest...@apache.org>
Authored: Sun Sep 25 09:37:33 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Sun Sep 25 09:37:33 2016 -0700

----------------------------------------------------------------------
 .../intervalindex/IntervalIndexJoiner.java      | 77 +++++++++++---------
 ...IntervalPartitionJoinOperatorDescriptor.java |  2 +
 .../IntervalPartitionJoiner.java                | 15 +++-
 .../sort/util/DeletableFrameTupleAppender.java  |  1 +
 .../util/DeletableFrameTupleAppenderTest.java   | 50 +++++++++----
 5 files changed, 93 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index d3aaa65..a4ad666 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.runtime.operators.joins.intervalindex;
 
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -62,7 +61,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
     private final int[] streamIndex;
     private final RunFileStream[] runFileStream;
 
-    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
+//    private final LinkedList<TuplePointer> buffer = new LinkedList<>();
 
     private final IIntervalMergeJoinChecker imjc;
 
@@ -124,6 +123,8 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         runFileStream[LEFT_PARTITION] = new RunFileStream(ctx, "left", 
status.branch[LEFT_PARTITION]);
         runFileStream[RIGHT_PARTITION] = new RunFileStream(ctx, "right", 
status.branch[RIGHT_PARTITION]);
 
+        LOGGER.setLevel(Level.FINE);
+        System.out.println("IntervalIndexJoiner: Logging level is: " + 
LOGGER.getLevel());
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("IntervalIndexJoiner has started partition " + 
partition + " with " + memorySize
                     + " frames of memory.");
@@ -246,9 +247,11 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         long leftStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[LEFT_PARTITION], leftKey);
         long rightStart = 
IntervalJoinUtil.getIntervalStart(inputAccessor[RIGHT_PARTITION], rightKey);
         if (leftStart < rightStart) {
+            // Left stream has next tuple, check if right active must be 
updated first.
             return activeManager[RIGHT_PARTITION].hasRecords()
                     && activeManager[RIGHT_PARTITION].getTopPoint() < 
leftStart;
         } else {
+            // Right stream has next tuple, check if left active must be 
update first.
             return !(activeManager[LEFT_PARTITION].hasRecords()
                     && activeManager[LEFT_PARTITION].getTopPoint() < 
rightStart);
         }
@@ -334,7 +337,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
                 if 
(activeManager[LEFT_PARTITION].addTuple(inputAccessor[LEFT_PARTITION], tp)) {
-                    buffer.add(tp);
+                    
processTupleJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION],
+                            inputAccessor[LEFT_PARTITION], true, writer);
+//                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -348,10 +353,10 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         } while (loadLeftTuple().isLoaded() && loadRightTuple().isLoaded() && 
!checkToProcessRightTuple());
 
         // Add Results
-        if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
-                    memoryAccessor[LEFT_PARTITION], true, writer);
-        }
+//        if (!buffer.isEmpty()) {
+//            
processActiveJoin(activeManager[RIGHT_PARTITION].getActiveList(), 
memoryAccessor[RIGHT_PARTITION], buffer,
+//                    memoryAccessor[LEFT_PARTITION], true, writer);
+//        }
     }
 
     private void processRightTuple(IFrameWriter writer) throws 
HyracksDataException {
@@ -364,7 +369,9 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
                 // Add to active, end point index and buffer.
                 TuplePointer tp = new TuplePointer();
                 if 
(activeManager[RIGHT_PARTITION].addTuple(inputAccessor[RIGHT_PARTITION], tp)) {
-                    buffer.add(tp);
+                    
processTupleJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION],
+                            inputAccessor[RIGHT_PARTITION], false, writer);
+//                    buffer.add(tp);
                 } else {
                     // Spill case
                     freezeAndSpill();
@@ -378,32 +385,32 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
         } while (loadRightTuple().isLoaded() && checkToProcessRightTuple());
 
         // Add Results
-        if (!buffer.isEmpty()) {
-            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
-                    memoryAccessor[RIGHT_PARTITION], false, writer);
-        }
+//        if (!buffer.isEmpty()) {
+//            processActiveJoin(activeManager[LEFT_PARTITION].getActiveList(), 
memoryAccessor[LEFT_PARTITION], buffer,
+//                    memoryAccessor[RIGHT_PARTITION], false, writer);
+//        }
     }
 
-    private void processActiveJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
-            List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, 
boolean reversed, IFrameWriter writer)
-            throws HyracksDataException {
-        for (TuplePointer outerTp : outer) {
-            outerAccessor.reset(outerTp);
-            for (TuplePointer innerTp : inner) {
-                innerAccessor.reset(innerTp);
-                if (imjc.checkToSaveInResult(outerAccessor, 
outerTp.getTupleIndex(), innerAccessor,
-                        innerTp.getTupleIndex(), reversed)) {
-                    addToResult(outerAccessor, outerTp.getTupleIndex(), 
innerAccessor, innerTp.getTupleIndex(),
-                            reversed, writer);
-                }
-                joinComparisonCount++;
-            }
-        }
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
-        }
-        buffer.clear();
-    }
+//    private void processActiveJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
+//            List<TuplePointer> inner, ITuplePointerAccessor innerAccessor, 
boolean reversed, IFrameWriter writer)
+//            throws HyracksDataException {
+//        for (TuplePointer outerTp : outer) {
+//            outerAccessor.reset(outerTp);
+//            for (TuplePointer innerTp : inner) {
+//                innerAccessor.reset(innerTp);
+//                if (imjc.checkToSaveInResult(outerAccessor, 
outerTp.getTupleIndex(), innerAccessor,
+//                        innerTp.getTupleIndex(), reversed)) {
+//                    addToResult(outerAccessor, outerTp.getTupleIndex(), 
innerAccessor, innerTp.getTupleIndex(),
+//                            reversed, writer);
+//                }
+//                joinComparisonCount++;
+//            }
+//        }
+//        if (LOGGER.isLoggable(Level.FINE)) {
+//            LOGGER.fine("Sweep for " + buffer.size() + " tuples");
+//        }
+//        buffer.clear();
+//    }
 
     private void processTupleJoin(List<TuplePointer> outer, 
ITuplePointerAccessor outerAccessor,
             ITupleAccessor tupleAccessor, boolean reversed, IFrameWriter 
writer) throws HyracksDataException {
@@ -456,6 +463,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
     private void unfreezeAndContinue(int frozenPartition, ITupleAccessor 
accessor) throws HyracksDataException {
         int flushPartition = frozenPartition == LEFT_PARTITION ? 
RIGHT_PARTITION : LEFT_PARTITION;
+        runFileStream[frozenPartition].flushAndStopRunFile(accessor);
         if (LOGGER.isLoggable(Level.WARNING)) {
             LOGGER.warning("snapshot(" + frozenPartition + "): " + 
frameCounts[RIGHT_PARTITION] + " right, "
                     + frameCounts[LEFT_PARTITION] + " left, left[" + 
bufferManager.getNumTuples(LEFT_PARTITION)
@@ -474,8 +482,6 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
             spillReadCount[RIGHT_PARTITION] = 
runFileStream[RIGHT_PARTITION].getReadCount();
             spillWriteCount[RIGHT_PARTITION] = 
runFileStream[RIGHT_PARTITION].getWriteCount();
         }
-
-        runFileStream[frozenPartition].flushAndStopRunFile(accessor);
         flushMemory(flushPartition);
         if ((LEFT_PARTITION == frozenPartition && 
!status.branch[LEFT_PARTITION].isRunFileReading())
                 || (RIGHT_PARTITION == frozenPartition && 
!status.branch[RIGHT_PARTITION].isRunFileReading())) {
@@ -489,8 +495,7 @@ public class IntervalIndexJoiner extends 
AbstractMergeJoiner {
 
     @Override
     public void closeInput(int partition) throws HyracksDataException {
-        // TODO Auto-generated method stub
-
+        // No changes are required.
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 6ea1e6f..c7986e6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -160,6 +160,8 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, 
buildHpc, probeHpc);
 
                     state.ipj.initBuild();
+                    LOGGER.setLevel(Level.FINE);
+                    
System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: 
" + LOGGER.getLevel());
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("IntervalPartitionJoin is starting the 
build phase with " + state.k
                                 + " granules repesenting " + 
state.intervalPartitions + " interval partitions using "

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index e943a48..31e200b 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -100,6 +100,8 @@ public class IntervalPartitionJoiner {
     private long spillCount = 0;
     private long spillReadCount = 0;
     private long spillWriteCount = 0;
+    private long buildSize;
+    private int tmp = -1;
 
     public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, 
int k, int numOfPartitions,
             String buildRelName, String probeRelName, 
IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
@@ -129,6 +131,8 @@ public class IntervalPartitionJoiner {
     public void initBuild() throws HyracksDataException {
         buildBufferManager = new VPartitionTupleBufferManager(ctx, 
getPartitionMemoryConstrain(), numOfPartitions,
                 memForJoin * ctx.getInitialFrameSize());
+        System.err.println("k: " + k);
+        buildSize = 0;
     }
 
     private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
@@ -139,14 +143,23 @@ public class IntervalPartitionJoiner {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
 
+        int pid;
         for (int i = 0; i < tupleCount; ++i) {
-            int pid = buildHpc.partition(accessorBuild, i, k);
+            pid = buildHpc.partition(accessorBuild, i, k);
+
+            if (tmp != pid) {
+                System.err.println("buildSize: " + buildSize + " pid: " + pid 
+ " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k));
+                tmp = pid;
+            }
             processTuple(i, pid);
             ipjd.buildIncrementCount(pid);
+            buildSize++;
         }
     }
 
     public void closeBuild() throws HyracksDataException {
+        System.err.println("buildSize: " + buildSize);
+
         int inMemoryPartitions = 0;
         int totalBuildPartitions = 0;
         flushAndClearBuildSpilledPartition();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
index 8cae721..d242daa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppender.java
@@ -181,6 +181,7 @@ public class DeletableFrameTupleAppender implements 
IAppendDeletableFrameTupleAc
         this.array = buffer.array();
         setIndexCount(0);
         setDeletedSpace(0);
+        setNextIndex(0);
         setTupleAppend(0);
         resetCounts();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/23eab43d/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
index 7686540..af3cdfc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/sort/util/DeletableFrameTupleAppenderTest.java
@@ -37,15 +37,19 @@ import org.apache.hyracks.util.string.UTF8StringUtil;
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * @see org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender
+ */
 public class DeletableFrameTupleAppenderTest {
+    private static final int META_DATA_SIZE = 4 + 4 + 4 + 4;
+    private static final int SLOT_SIZE = 4 + 4;
+    private static final char TEST_CH = 'x';
+
     DeletableFrameTupleAppender appender;
-    ISerializerDeserializer[] fields = new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE,
-            new UTF8StringSerializerDeserializer(),
-    };
+    ISerializerDeserializer[] fields = new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), };
     RecordDescriptor recordDescriptor = new RecordDescriptor(fields);
     ArrayTupleBuilder builder = new 
ArrayTupleBuilder(recordDescriptor.getFieldCount());
-    static final char TEST_CH = 'x';
 
     int cap = 256;
 
@@ -60,26 +64,42 @@ public class DeletableFrameTupleAppenderTest {
         appender.clear(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == 0);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+        assertTrue(appender.getTotalFreeSpace() == cap - META_DATA_SIZE);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
     }
 
     ByteBuffer makeAFrame(int capacity, int count, int deletedBytes) throws 
HyracksDataException {
         ByteBuffer buffer = ByteBuffer.allocate(capacity);
         int metaOffset = capacity - 4;
+        buffer.putInt(metaOffset, count);
+        metaOffset -= 4;
         buffer.putInt(metaOffset, deletedBytes);
+        // next index
         metaOffset -= 4;
         buffer.putInt(metaOffset, count);
+        // append slot
+        metaOffset -= 4;
+        int appendOffset = metaOffset;
+        buffer.putInt(metaOffset, 0);
         metaOffset -= 4;
+
+        int start = 0;
         for (int i = 0; i < count; i++, metaOffset -= 4) {
             makeARecord(builder, i);
             for (int x = 0; x < builder.getFieldEndOffsets().length; x++) {
                 buffer.putInt(builder.getFieldEndOffsets()[x]);
             }
             buffer.put(builder.getByteArray(), 0, builder.getSize());
-            assert (metaOffset > buffer.position());
+
+            // Add slot information
+            buffer.putInt(metaOffset, start);
+            metaOffset -= 4;
             buffer.putInt(metaOffset, buffer.position());
 
+            start = buffer.position();
+            assert (metaOffset > buffer.position());
         }
+        buffer.putInt(appendOffset, start);
         return buffer;
     }
 
@@ -110,16 +130,16 @@ public class DeletableFrameTupleAppenderTest {
         appender.reset(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == 0);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE);
 
-        int count = 10;
+        int count = 8;
         int deleted = 7;
         buffer = makeAFrame(cap, count, deleted);
         int pos = buffer.position();
         appender.reset(buffer);
         assertTrue(appender.getBuffer() == buffer);
         assertTrue(appender.getTupleCount() == count);
-        assertTrue(appender.getContiguousFreeSpace() == cap - 4 - 4 - count * 
4 - pos);
+        assertTrue(appender.getContiguousFreeSpace() == cap - META_DATA_SIZE - 
count * SLOT_SIZE - pos);
         assertTrue(appender.getTotalFreeSpace() == 
appender.getContiguousFreeSpace() + deleted);
 
         int dataOffset = 0;
@@ -130,7 +150,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testAppend() throws Exception {
-        int count = 10;
+        int count = 8;
         ByteBuffer bufferRead = makeAFrame(cap, count, 0);
         DeletableFrameTupleAppender accessor = new 
DeletableFrameTupleAppender(recordDescriptor);
         accessor.reset(bufferRead);
@@ -146,7 +166,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testDelete() throws Exception {
-        int count = 10;
+        int count = 8;
         int deleteSpace = 0;
         ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
         appender.reset(buffer);
@@ -165,7 +185,7 @@ public class DeletableFrameTupleAppenderTest {
     public void testResetAfterDelete() throws Exception {
         testDelete();
         appender.reset(appender.getBuffer());
-        assertEquals(cap - appender.getTupleCount() * 4 - 4 - 4, 
appender.getTotalFreeSpace());
+        assertEquals(cap - appender.getTupleCount() * SLOT_SIZE - 
META_DATA_SIZE, appender.getTotalFreeSpace());
 
     }
 
@@ -187,7 +207,7 @@ public class DeletableFrameTupleAppenderTest {
     @Test
     public void testAppendAndDelete() throws Exception {
         int cap = 1024;
-        int count = 10;
+        int count = 8;
         int deleteSpace = 0;
         ByteBuffer buffer = makeAFrame(cap, count, deleteSpace);
         int dataOffset = buffer.position();
@@ -221,7 +241,7 @@ public class DeletableFrameTupleAppenderTest {
 
     @Test
     public void testReOrganizeBuffer() throws Exception {
-        int count = 10;
+        int count = 8;
         testDelete();
         appender.reOrganizeBuffer();
         ByteBuffer bufferRead = makeAFrame(cap, count, 0);

Reply via email to