updated partition algorithm with build only side memory

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/19f0997f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/19f0997f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/19f0997f

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 19f0997ff0a758e5193e531b220ea7c7e3424a43
Parents: f6dba46
Author: Preston Carman <prest...@apache.org>
Authored: Wed Sep 28 16:16:05 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Wed Sep 28 16:16:05 2016 -0700

----------------------------------------------------------------------
 ...IntervalPartitionJoinOperatorDescriptor.java |  12 +-
 .../IntervalPartitionJoiner.java                | 273 +++++++++++--------
 .../dataflow/common/io/RunFileReader.java       |  13 +
 .../dataflow/std/join/RunFileStream.java        |   8 +
 4 files changed, 184 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index c7986e6..60a4697 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -159,7 +159,7 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
                     state.ipj = new IntervalPartitionJoiner(ctx, 
state.memoryForJoin, state.k, state.intervalPartitions,
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, 
buildHpc, probeHpc);
 
-                    state.ipj.initBuild();
+                    state.ipj.buildInit();
                     LOGGER.setLevel(Level.FINE);
                     
System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: 
" + LOGGER.getLevel());
                     if (LOGGER.isLoggable(Level.FINE)) {
@@ -171,13 +171,13 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    state.ipj.build(buffer);
+                    state.ipj.buildStep(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
                     if (!failure) {
-                        state.ipj.closeBuild();
+                        state.ipj.buildClose();
                         ctx.setStateObject(state);
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("IntervalPartitionJoin closed its 
build phase");
@@ -216,7 +216,7 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
                             new TaskId(new ActivityId(getOperatorId(), 
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
                     writer.open();
-                    state.ipj.initProbe();
+                    state.ipj.probeInit();
 
                     if (LOGGER.isLoggable(Level.FINE)) {
                         LOGGER.fine("IntervalPartitionJoin is starting the 
probe phase.");
@@ -225,7 +225,7 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    state.ipj.probe(buffer, writer);
+                    state.ipj.probeStep(buffer, writer);
                 }
 
                 @Override
@@ -235,7 +235,7 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.ipj.closeProbe(writer);
+                    state.ipj.probeClose(writer);
                     state.ipj.joinSpilledPartitions(writer);
                     state.ipj.closeAndDeleteRunFiles();
                     writer.close();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
index 31e200b..9c5a872 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoiner.java
@@ -31,6 +31,8 @@ import java.util.logging.Logger;
 import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -47,8 +49,10 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManage
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 /**
- * This class mainly applies one level of HHJ on a pair of
- * relations. It is always called by the descriptor.
+ * The Interval Partition Join runs in three stages: build, probe-in-memory, 
probe-spill.
+ * build: Saves all build partitions either to memory or disk.
+ * probe-in-memory: Joins all in memory partitions and saves the necessary 
partitions to disk.
+ * probe-spill: Spilled build partitions are loaded into memory and joined 
with all probe remaining partitions.
  */
 public class IntervalPartitionJoiner {
 
@@ -59,9 +63,6 @@ public class IntervalPartitionJoiner {
         PROBE
     }
 
-    // Used for special probe BigObject which can not be held into the Join 
memory
-    private FrameTupleAppender bigProbeFrameAppender;
-
     private IHyracksTaskContext ctx;
 
     private final String buildRelName;
@@ -76,13 +77,12 @@ public class IntervalPartitionJoiner {
     private RunFileWriter[] buildRFWriters; //writing spilled build partitions
     private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
 
-    private final int memForJoin;
+    private final int buildMemory;
     private final int k;
     private final int numOfPartitions;
     private InMemoryIntervalPartitionJoin[] inMemJoiner; //Used for joining 
resident partitions
 
     private VPartitionTupleBufferManager buildBufferManager;
-    private VPartitionTupleBufferManager probeBufferManager;
 
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
@@ -97,17 +97,23 @@ public class IntervalPartitionJoiner {
 
     private long joinComparisonCount = 0;
     private long joinResultCount = 0;
-    private long spillCount = 0;
     private long spillReadCount = 0;
     private long spillWriteCount = 0;
     private long buildSize;
+    private long probeSize;
     private int tmp = -1;
 
+    private RunFileWriter probeRunFileWriter = null;
+    private final IFrameTupleAppender probeRunFileAppender;
+    private int probeRunFilePid = -1;
+
     public IntervalPartitionJoiner(IHyracksTaskContext ctx, int memForJoin, 
int k, int numOfPartitions,
             String buildRelName, String probeRelName, 
IIntervalMergeJoinChecker imjc, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, 
ITuplePartitionComputer probeHpc) {
+            RecordDescriptor probeRd, ITuplePartitionComputer buildHpc, 
ITuplePartitionComputer probeHpc)
+            throws HyracksDataException {
         this.ctx = ctx;
-        this.memForJoin = memForJoin;
+        // TODO fix available memory size
+        this.buildMemory = memForJoin;
         this.k = k;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
@@ -125,21 +131,19 @@ public class IntervalPartitionJoiner {
         this.accessorBuild = new FrameTupleAccessor(buildRd);
         this.accessorProbe = new FrameTupleAccessor(probeRd);
 
+        reloadBuffer = new VSizeFrame(ctx);
+        probeRunFileAppender = new FrameTupleAppender(new VSizeFrame(ctx));
         ipjd = new IntervalPartitionJoinData(k, imjc, numOfPartitions);
     }
 
-    public void initBuild() throws HyracksDataException {
-        buildBufferManager = new VPartitionTupleBufferManager(ctx, 
getPartitionMemoryConstrain(), numOfPartitions,
-                memForJoin * ctx.getInitialFrameSize());
+    public void buildInit() throws HyracksDataException {
+        buildBufferManager = new VPartitionTupleBufferManager(ctx, 
VPartitionTupleBufferManager.NO_CONSTRAIN,
+                numOfPartitions, buildMemory * ctx.getInitialFrameSize());
         System.err.println("k: " + k);
         buildSize = 0;
     }
 
-    private IPartitionedMemoryConstrain getPartitionMemoryConstrain() {
-        return VPartitionTupleBufferManager.NO_CONSTRAIN;
-    }
-
-    public void build(ByteBuffer buffer) throws HyracksDataException {
+    public void buildStep(ByteBuffer buffer) throws HyracksDataException {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
 
@@ -148,7 +152,8 @@ public class IntervalPartitionJoiner {
             pid = buildHpc.partition(accessorBuild, i, k);
 
             if (tmp != pid) {
-                System.err.println("buildSize: " + buildSize + " pid: " + pid 
+ " k: " + k + " pair: " + IntervalPartitionUtil.getIntervalPartition(pid, k));
+                System.err.println("buildSize: " + buildSize + " pid: " + pid 
+ " k: " + k + " pair: "
+                        + IntervalPartitionUtil.getIntervalPartition(pid, k));
                 tmp = pid;
             }
             processTuple(i, pid);
@@ -157,7 +162,7 @@ public class IntervalPartitionJoiner {
         }
     }
 
-    public void closeBuild() throws HyracksDataException {
+    public void buildClose() throws HyracksDataException {
         System.err.println("buildSize: " + buildSize);
 
         int inMemoryPartitions = 0;
@@ -165,7 +170,7 @@ public class IntervalPartitionJoiner {
         flushAndClearBuildSpilledPartition();
 
         // Trying to bring back as many spilled partitions as possible, making 
them resident
-        bringBackSpilledPartitionIfHasMoreMemory();
+        bringBackSpilledPartitionIfHasMoreMemory(false);
 
         // Update build partition join map based on partitions with actual 
data.
         for (int i = ipjd.buildNextInMemory(0); i >= 0; i = 
ipjd.buildNextInMemory(i + 1)) {
@@ -239,7 +244,6 @@ public class IntervalPartitionJoiner {
 
     private void spillPartition(int pid) throws HyracksDataException {
         RunFileWriter writer = getSpillWriterOrCreateNewOneIfNotExist(pid, 
SIDE.BUILD);
-        spillCount++;
         spillWriteCount += buildBufferManager.getNumFrames(pid);
         buildBufferManager.flushPartition(pid, writer);
         buildBufferManager.clearPartition(pid);
@@ -274,9 +278,9 @@ public class IntervalPartitionJoiner {
         for (int pid = 0; pid < numOfPartitions; ++pid) {
             if (buildBufferManager.getNumTuples(pid) > 0) {
                 buildBufferManager.clearPartition(pid);
-                ipjd.buildRemoveFromJoin(pid);
             }
         }
+        ipjd.buildClearMemory();
     }
 
     private void flushAndClearBuildSpilledPartition() throws 
HyracksDataException {
@@ -291,63 +295,88 @@ public class IntervalPartitionJoiner {
         }
     }
 
-    private void flushAndClearProbeSpilledPartition() throws 
HyracksDataException {
-        for (int pid = 0; pid < numOfPartitions; ++pid) {
-            if (probeBufferManager.getNumTuples(pid) > 0) {
-                spillWriteCount += probeBufferManager.getNumFrames(pid);
-                RunFileWriter runFileWriter = 
getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
-                probeBufferManager.flushPartition(pid, runFileWriter);
-                probeBufferManager.clearPartition(pid);
-                probeRFWriters[pid].close();
-            }
+    private void flushProbeSpilledPartition() throws HyracksDataException {
+        if (probeRunFileWriter != null) {
+            // flush previous runFile
+            probeRunFileAppender.write(probeRunFileWriter, true);
+            probeRunFileWriter.close();
+            spillWriteCount++;
         }
     }
 
-    private void bringBackSpilledPartitionIfHasMoreMemory() throws 
HyracksDataException {
-        // we need number of |spilledPartitions| buffers to store the probe 
data
-        int freeSpace = (memForJoin - ipjd.buildGetSpilledCount()) * 
ctx.getInitialFrameSize();
+    private void bringBackSpilledPartitionIfHasMoreMemory(boolean 
partitalLoad) throws HyracksDataException {
+        int freeFrames = buildMemory;
         for (int i = ipjd.buildNextInMemoryWithResults(0); i >= 0; i = 
ipjd.buildNextInMemoryWithResults(i + 1)) {
-            freeSpace -= buildBufferManager.getPhysicalSize(i);
+            freeFrames -= buildBufferManager.getNumFrames(i);
         }
 
         int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
+        while ((pid = selectPartitionsToReload(freeFrames, pid, partitalLoad)) 
>= 0 && freeFrames > 0) {
+            if (pid == 225) {
+                int i = 0;
+            }
             if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
                 return;
             }
-            freeSpace -= buildBufferManager.getPhysicalSize(pid);
+            freeFrames -= buildBufferManager.getNumFrames(pid);
         }
     }
 
+    int buildParitialLoadPid = -1;
+    int buildParitialNextTid = -1;
+    long buildParitialResetReader = -1;
+
     private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws 
HyracksDataException {
-        RunFileReader r = wr.createDeleteOnCloseReader();
+        if (pid == 225) {
+            int i = 0;
+        }
+        RunFileReader r = wr.createReader();
         r.open();
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
+        if (buildParitialLoadPid == pid && buildParitialResetReader > 0) {
+            r.reset(buildParitialResetReader);
         }
+        int framesLoaded = 0;
         while (r.nextFrame(reloadBuffer)) {
+            framesLoaded++;
             accessorBuild.reset(reloadBuffer.getBuffer());
-            for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
+            spillReadCount++;
+            for (int tid = buildParitialNextTid > 0 ? buildParitialNextTid : 
0; tid < accessorBuild
+                    .getTupleCount(); tid++) {
                 if (!buildBufferManager.insertTuple(pid, accessorBuild, tid, 
tempPtr)) {
-                    // for some reason (e.g. due to fragmentation) if the 
inserting failed, we need to clear the occupied frames
-                    buildBufferManager.clearPartition(pid);
+                    // for some reason (e.g. due to fragmentation) if the 
inserting failed
+                    // we need to start this partition from this location on 
the next round.
+                    buildParitialLoadPid = pid;
+                    buildParitialNextTid = tid;
+                    buildParitialResetReader = r.getReadPointer();
+                    ipjd.buildLoad(pid);
+                    createInMemoryJoiner(pid);
                     r.close();
                     return false;
                 }
             }
-            spillReadCount++;
+        }
+        if (framesLoaded == 0) {
+            int t = 0;
         }
 
-        r.close();
         ipjd.buildLoad(pid);
+        createInMemoryJoiner(pid);
+        r.close();
         buildRFWriters[pid] = null;
+        buildParitialLoadPid = -1;
+        buildParitialNextTid = -1;
+        buildParitialResetReader = -1;
         return true;
     }
 
-    private int selectPartitionsToReload(int freeSpace, int pid) {
-        for (int id = ipjd.buildNextSpilled(0); id >= 0; id = 
ipjd.buildNextSpilled(id + 1)) {
+    private int selectPartitionsToReload(int freeFrames, int pid, boolean 
partitalLoad) {
+        int freeSpace = freeFrames * ctx.getInitialFrameSize();
+        if (freeSpace > 0 && buildParitialLoadPid > 0 && 
buildParitialResetReader > 0) {
+            return buildParitialLoadPid;
+        }
+        for (int id = ipjd.buildNextSpilled(pid); id >= 0; id = 
ipjd.buildNextSpilled(id + 1)) {
             assert buildRFWriters[id].getFileSize() > 0 : "How come a spilled 
partition have size 0?";
-            if (freeSpace >= buildRFWriters[id].getFileSize()) {
+            if (partitalLoad || freeSpace >= buildRFWriters[id].getFileSize()) 
{
                 return id;
             }
         }
@@ -366,20 +395,24 @@ public class IntervalPartitionJoiner {
         inMemJoiner[pid] = null;
     }
 
-    public void initProbe() throws HyracksDataException {
-        int probeMemory = numOfPartitions > memForJoin ? memForJoin : 
numOfPartitions;
-        probeBufferManager = new VPartitionTupleBufferManager(ctx, 
getPartitionMemoryConstrain(), numOfPartitions,
-                (probeMemory) * ctx.getInitialFrameSize());
-
+    public void probeInit() throws HyracksDataException {
         probeRFWriters = new RunFileWriter[numOfPartitions];
+        probeSize = 0;
     }
 
-    public void probe(ByteBuffer buffer, IFrameWriter writer) throws 
HyracksDataException {
+    public void probeStep(ByteBuffer buffer, IFrameWriter writer) throws 
HyracksDataException {
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();
 
         for (int i = 0; i < tupleCount; ++i) {
             int pid = probeHpc.partition(accessorProbe, i, k);
+
+            if (tmp != pid) {
+                System.err.println("probeSize: " + probeSize + " pid: " + pid 
+ " k: " + k + " pair: "
+                        + IntervalPartitionUtil.getIntervalPartition(pid, k));
+                tmp = pid;
+            }
+
             if (!ipjd.hasProbeJoinMap(pid)) {
                 // Set probe join map
                 ipjd.setProbeJoinMap(pid,
@@ -390,23 +423,7 @@ public class IntervalPartitionJoiner {
             if (!ipjd.isProbeJoinMapEmpty(pid)) {
                 if (ipjd.probeHasSpilled(pid)) {
                     // pid is Spilled
-                    while (!probeBufferManager.insertTuple(pid, accessorProbe, 
i, tempPtr)) {
-                        int victim = pid;
-                        if (probeBufferManager.getNumTuples(pid) == 0) {
-                            // current pid is empty, choose the biggest one
-                            victim = selectLargestSpilledPartition();
-                        }
-                        if (victim < 0) {
-                            // current tuple is too big for all the free space
-                            flushBigProbeObjectToDisk(pid, accessorProbe, i);
-                            break;
-                        }
-                        RunFileWriter runFileWriter = 
getSpillWriterOrCreateNewOneIfNotExist(victim, SIDE.PROBE);
-                        spillCount++;
-                        spillWriteCount += 
probeBufferManager.getNumFrames(pid);
-                        probeBufferManager.flushPartition(victim, 
runFileWriter);
-                        probeBufferManager.clearPartition(victim);
-                    }
+                    probeSpillTuple(accessorProbe, i, pid);
                 }
                 for (Iterator<Integer> pidIterator = 
ipjd.getProbeJoinMap(pid); pidIterator.hasNext();) {
                     // pid has join partitions that are Resident
@@ -417,33 +434,43 @@ public class IntervalPartitionJoiner {
                 }
             }
             ipjd.probeIncrementCount(pid);
+            probeSize++;
         }
     }
 
-    public void closeProbe(IFrameWriter writer) throws HyracksDataException {
-        // We do NOT join the spilled partitions here, that decision is made 
at the descriptor level (which join technique to use)
+    /**
+     * Closes the probe process.
+     * We do NOT join the spilled partitions here, use {@link 
joinSpilledPartitions}.
+     *
+     * @param writer
+     * @throws HyracksDataException
+     */
+    public void probeClose(IFrameWriter writer) throws HyracksDataException {
+        System.err.println("probeSize: " + probeSize);
+
         for (int i = 0; i < inMemJoiner.length; ++i) {
             if (inMemJoiner[i] != null) {
                 closeInMemoryJoiner(i, writer);
                 ipjd.buildLogJoined(i);
+                ipjd.buildRemoveFromJoin(i);
             }
         }
         clearBuildMemory();
-        flushAndClearProbeSpilledPartition();
-        probeBufferManager.close();
-        probeBufferManager = null;
+        flushProbeSpilledPartition();
     }
 
-    private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor 
accessorProbe, int i)
+    private void probeSpillTuple(IFrameTupleAccessor accessorProbe, int 
probeTupleIndex, int pid)
             throws HyracksDataException {
-        if (bigProbeFrameAppender == null) {
-            bigProbeFrameAppender = new FrameTupleAppender(new 
VSizeFrame(ctx));
+        if (pid != probeRunFilePid) {
+            flushProbeSpilledPartition();
+            probeRunFileWriter = getSpillWriterOrCreateNewOneIfNotExist(pid, 
SIDE.PROBE);
+            probeRunFilePid = pid;
         }
-        RunFileWriter runFileWriter = 
getSpillWriterOrCreateNewOneIfNotExist(pid, SIDE.PROBE);
-        if (!bigProbeFrameAppender.append(accessorProbe, i)) {
-            throw new HyracksDataException("The given tuple is too big");
+        if (!probeRunFileAppender.append(accessorProbe, probeTupleIndex)) {
+            probeRunFileAppender.write(probeRunFileWriter, true);
+            probeRunFileAppender.append(accessorProbe, probeTupleIndex);
+            spillWriteCount++;
         }
-        bigProbeFrameAppender.write(runFileWriter, true);
     }
 
     public RunFileReader getBuildRFReader(int pid) throws HyracksDataException 
{
@@ -456,42 +483,28 @@ public class IntervalPartitionJoiner {
 
     public void joinSpilledPartitions(IFrameWriter writer) throws 
HyracksDataException {
         LinkedHashMap<Integer, LinkedHashSet<Integer>> probeInMemoryJoinMap;
-        if (reloadBuffer == null) {
-            reloadBuffer = new VSizeFrame(ctx);
-        }
-        HashSet<Integer> inMemory = new HashSet<>();
         while (ipjd.buildGetSpilledCount() > 0) {
             // Load back spilled build partitions.
             // TODO only load partition required for spill join. Consider both 
sides.
-            bringBackSpilledPartitionIfHasMoreMemory();
-
-            probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
+            bringBackSpilledPartitionIfHasMoreMemory(true);
 
             // Create in memory joiners.
-            for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid 
= ipjd
-                    .buildNextInMemoryWithResults(pid + 1)) {
-                createInMemoryJoiner(pid);
-                inMemory.add(pid);
-            }
+            //            for (int pid = ipjd.buildNextInMemoryWithResults(0); 
pid >= 0; pid = ipjd
+            //                    .buildNextInMemoryWithResults(pid + 1)) {
+            //                createInMemoryJoiner(pid);
+            //            }
+
+            probeInMemoryJoinMap = ipjd.probeGetInMemoryJoinMap();
 
             // Join all build partitions with disk probe partitions.
             for (Entry<Integer, LinkedHashSet<Integer>> entry : 
probeInMemoryJoinMap.entrySet()) {
-                if (ipjd.probeGetCount(entry.getKey()) > 0 && 
probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
-                    RunFileReader pReader = getProbeRFReader(entry.getKey());
-                    pReader.open();
-                    while (pReader.nextFrame(reloadBuffer)) {
-                        accessorProbe.reset(reloadBuffer.getBuffer());
-                        for (int i = 0; i < accessorProbe.getTupleCount(); 
++i) {
-                            // Tuple has potential match from build phase
-                            for (Integer j : 
probeInMemoryJoinMap.get(entry.getKey())) {
-                                // j has join partitions that are Resident
-                                if (inMemJoiner[j] != null) {
-                                    inMemJoiner[j].join(accessorProbe, i, 
writer);
-                                }
-                            }
-                        }
-                    }
-                    pReader.close();
+                if (entry.getKey() == 221) {
+                    int t = 0;
+                }
+                System.err.println(" join pid: " + entry.getKey() + " with : " 
+ probeInMemoryJoinMap);
+
+                if (ipjd.probeGetCount(entry.getKey()) > 0 && 
!probeInMemoryJoinMap.get(entry.getKey()).isEmpty()) {
+                    joinSpilledProbeWithBuildMemory(writer, 
probeInMemoryJoinMap, entry.getKey());
                 }
             }
 
@@ -499,13 +512,38 @@ public class IntervalPartitionJoiner {
             for (int pid = ipjd.buildNextInMemoryWithResults(0); pid >= 0; pid 
= ipjd
                     .buildNextInMemoryWithResults(pid + 1)) {
                 closeInMemoryJoiner(pid, writer);
-                ipjd.buildLogJoined(pid);
+                if (pid != buildParitialLoadPid) {
+                    ipjd.buildLogJoined(pid);
+                    ipjd.buildRemoveFromJoin(pid);
+                } else {
+                    int t = 0;
+                }
             }
-            inMemory.clear();
             clearBuildMemory();
         }
     }
 
+    private void joinSpilledProbeWithBuildMemory(IFrameWriter writer,
+            LinkedHashMap<Integer, LinkedHashSet<Integer>> 
probeInMemoryJoinMap, int probePid)
+            throws HyracksDataException {
+        RunFileReader pReader = getProbeRFReader(probePid);
+        pReader.open();
+        while (pReader.nextFrame(reloadBuffer)) {
+            accessorProbe.reset(reloadBuffer.getBuffer());
+            spillReadCount++;
+            for (int i = 0; i < accessorProbe.getTupleCount(); ++i) {
+                // Tuple has potential match from build phase
+                for (Integer j : probeInMemoryJoinMap.get(probePid)) {
+                    // j has join partitions that are Resident
+                    if (inMemJoiner[j] != null) {
+                        inMemJoiner[j].join(accessorProbe, i, writer);
+                    }
+                }
+            }
+        }
+        pReader.close();
+    }
+
     class IntervalPartitionJoinData {
         private LinkedHashMap<Integer, LinkedHashSet<Integer>> probeJoinMap;
 
@@ -555,6 +593,10 @@ public class IntervalPartitionJoiner {
             }
         }
 
+        public void buildClearMemory() {
+            buildInMemoryStatus.clear();
+        }
+
         public void buildIncrementCount(int pid) {
             buildInMemoryStatus.set(pid);
             buildPSizeInTups[pid]++;
@@ -676,8 +718,7 @@ public class IntervalPartitionJoiner {
         }
         if (LOGGER.isLoggable(Level.WARNING)) {
             LOGGER.warning("IntervalPartitionJoiner statitics: " + 
joinComparisonCount + " comparisons, "
-                    + joinResultCount + " results, " + spillCount + " spills, 
" + spillWriteCount
-                    + " spill frames written, " + spillReadCount + " spill 
frames read.");
+                    + joinResultCount + " results, " + spillWriteCount + " 
written, " + spillReadCount + " read.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index f68a49c..ae2f0b4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -32,6 +32,7 @@ public class RunFileReader implements IFrameReader {
     private IFileHandle handle;
     private final IIOManager ioManager;
     private final long size;
+    private long readPreviousPtr;
     private long readPtr;
     private boolean deleteAfterClose;
 
@@ -46,6 +47,7 @@ public class RunFileReader implements IFrameReader {
     public void open() throws HyracksDataException {
         handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_ONLY, 
null);
         readPtr = 0;
+        readPreviousPtr = 0;
     }
 
     @Override
@@ -53,6 +55,7 @@ public class RunFileReader implements IFrameReader {
         if (readPtr >= size) {
             return false;
         }
+        readPreviousPtr = readPtr;
         frame.reset();
 
         int readLength = ioManager.syncRead(handle, readPtr, 
frame.getBuffer());
@@ -79,6 +82,12 @@ public class RunFileReader implements IFrameReader {
 
     public void reset() throws HyracksDataException {
         readPtr = 0;
+        readPreviousPtr = readPtr;
+    }
+
+    public void reset(long pointer) throws HyracksDataException {
+        readPtr = pointer;
+        readPreviousPtr = readPtr;
     }
 
     @Override
@@ -92,4 +101,8 @@ public class RunFileReader implements IFrameReader {
     public long getFileSize() {
         return size;
     }
+
+    public long getReadPointer() {
+        return readPreviousPtr;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/19f0997f/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
index 2513b1b..7e8a8d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/RunFileStream.java
@@ -50,6 +50,14 @@ public class RunFileStream {
     private long writeCount = 0;
     private long tupleCount = 0;
 
+    /**
+     * The RunFileSream uses two frames to buffer read and write operations.
+     *
+     * @param ctx
+     * @param key
+     * @param status
+     * @throws HyracksDataException
+     */
     public RunFileStream(IHyracksTaskContext ctx, String key, 
IRunFileStreamStatus status) throws HyracksDataException {
         this.ctx = ctx;
         this.key = key;

Reply via email to