Interface clean up.
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/ce593414 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/ce593414 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/ce593414 Branch: refs/heads/ecarm002/interval_join_merge Commit: ce5934148227e57eeebb369d57ae64d87b908959 Parents: 19f0997 Author: Preston Carman <prest...@apache.org> Authored: Thu Sep 29 16:05:18 2016 -0700 Committer: Preston Carman <prest...@apache.org> Committed: Thu Sep 29 16:05:18 2016 -0700 ---------------------------------------------------------------------- .../IntervalIndexJoinOperatorDescriptor.java | 5 +-- .../intervalindex/IntervalIndexJoiner.java | 46 ++++++++++---------- ...IntervalPartitionJoinOperatorDescriptor.java | 9 ++-- .../dataflow/std/join/AbstractMergeJoiner.java | 38 +++++++++------- .../hyracks/dataflow/std/join/IMergeJoiner.java | 6 +-- .../std/join/MergeJoinOperatorDescriptor.java | 6 +-- .../hyracks/dataflow/std/join/MergeJoiner.java | 35 +++++++-------- 7 files changed, 72 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java index d84fabc..fbddfef 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoinOperatorDescriptor.java @@ -151,7 +151,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip } try { state.indexJoiner.setFrame(LEFT_ACTIVITY_ID, buffer); - state.indexJoiner.processMergeUsingLeftTuple(writer); + state.indexJoiner.processLeftFrame(writer); } finally { locks.getLock(partition).unlock(); } @@ -175,8 +175,7 @@ public class IntervalIndexJoinOperatorDescriptor extends AbstractOperatorDescrip if (state.failed) { writer.fail(); } else { - state.indexJoiner.processMergeUsingLeftTuple(writer); - state.indexJoiner.closeResult(writer); + state.indexJoiner.processLeftClose(writer); writer.close(); } state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java index a4ad666..bac2f45 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java @@ -141,24 +141,6 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { joinResultCount++; } - @Override - public void closeResult(IFrameWriter writer) throws HyracksDataException { - resultAppender.write(writer, true); - activeManager[LEFT_PARTITION].clear(); - activeManager[RIGHT_PARTITION].clear(); - runFileStream[LEFT_PARTITION].close(); - runFileStream[RIGHT_PARTITION].close(); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() - + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " - + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, " - + runFileStream[RIGHT_PARTITION].getFileCount() + " files, " - + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, " - + runFileStream[RIGHT_PARTITION].getReadCount() + " read]."); - } - } - private void flushMemory(int partition) throws HyracksDataException { activeManager[partition].clear(); } @@ -209,7 +191,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } @Override - public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException { + public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { TupleStatus leftTs = loadLeftTuple(); TupleStatus rightTs = loadRightTuple(); while (leftTs.isKnown() && checkHasMoreProcessing(leftTs, LEFT_PARTITION, RIGHT_PARTITION) @@ -234,6 +216,27 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } } + @Override + public void processLeftClose(IFrameWriter writer) throws HyracksDataException { + processLeftFrame(writer); + + resultAppender.write(writer, true); + activeManager[LEFT_PARTITION].clear(); + activeManager[RIGHT_PARTITION].clear(); + runFileStream[LEFT_PARTITION].close(); + runFileStream[RIGHT_PARTITION].close(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("IntervalIndexJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, left[" + leftSpillCount + " spills, " + runFileStream[LEFT_PARTITION].getFileCount() + + " files, " + runFileStream[LEFT_PARTITION].getWriteCount() + " written, " + + runFileStream[LEFT_PARTITION].getReadCount() + " read]. right[" + rightSpillCount + " spills, " + + runFileStream[RIGHT_PARTITION].getFileCount() + " files, " + + runFileStream[RIGHT_PARTITION].getWriteCount() + " written, " + + runFileStream[RIGHT_PARTITION].getReadCount() + " read]."); + } + + } + private boolean checkHasMoreProcessing(TupleStatus ts, int partition, int joinPartition) { return ts.isLoaded() || status.branch[partition].isRunFileWriting() || (checkHasMoreTuples(joinPartition) && activeManager[partition].hasRecords()); @@ -493,9 +496,4 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner { } } - @Override - public void closeInput(int partition) throws HyracksDataException { - // No changes are required. - } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java index 60a4697..b4965ef 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java @@ -67,11 +67,11 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName()); - public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int k, int[] leftKeys, - int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, + public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memoryForJoin, int k, + int[] leftKeys, int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf, RangeId rangeId) { super(spec, 2, 1); - this.memsize = memsize; + this.memsize = memoryForJoin; this.buildKey = leftKeys[0]; this.probeKey = rightKeys[0]; this.k = k; @@ -161,7 +161,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes state.ipj.buildInit(); LOGGER.setLevel(Level.FINE); - System.out.println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel()); + System.out + .println("IntervalPartitionJoinOperatorDescriptor: Logging level is: " + LOGGER.getLevel()); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("IntervalPartitionJoin is starting the build phase with " + state.k + " granules repesenting " + state.intervalPartitions + " interval partitions using " http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java index aa065cd..b1b4075 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/AbstractMergeJoiner.java @@ -80,6 +80,29 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { resultAppender = new FrameTupleAppender(new VSizeFrame(ctx)); } + public void setLeftFrame(ByteBuffer buffer) { + setFrame(LEFT_PARTITION, buffer); + } + + public void setRightFrame(ByteBuffer buffer) { + setFrame(RIGHT_PARTITION, buffer); + } + + protected TupleStatus loadMemoryTuple(int branch) { + TupleStatus loaded; + if (inputAccessor[branch] != null && inputAccessor[branch].exists()) { + // Still processing frame. + int test = inputAccessor[branch].getTupleCount(); + loaded = TupleStatus.LOADED; + } else if (status.branch[branch].hasMore()) { + loaded = TupleStatus.UNKNOWN; + } else { + // No more frames or tuples to process. + loaded = TupleStatus.EMPTY; + } + return loaded; + } + protected TupleStatus pauseAndLoadRightTuple() { status.continueRightLoad = true; locks.getRight(partition).signal(); @@ -99,21 +122,6 @@ public abstract class AbstractMergeJoiner implements IMergeJoiner { return TupleStatus.LOADED; } - protected TupleStatus loadMemoryTuple(int branch) { - TupleStatus loaded; - if (inputAccessor[branch] != null && inputAccessor[branch].exists()) { - // Still processing frame. - int test = inputAccessor[branch].getTupleCount(); - loaded = TupleStatus.LOADED; - } else if (status.branch[branch].hasMore()) { - loaded = TupleStatus.UNKNOWN; - } else { - // No more frames or tuples to process. - loaded = TupleStatus.EMPTY; - } - return loaded; - } - @Override public void setFrame(int branch, ByteBuffer buffer) { inputBuffer[branch].clear(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java index 4268ec9..051d3e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoiner.java @@ -25,12 +25,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IMergeJoiner { - void closeResult(IFrameWriter writer) throws HyracksDataException; + void processLeftFrame(IFrameWriter writer) throws HyracksDataException; - void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException; + void processLeftClose(IFrameWriter writer) throws HyracksDataException; void setFrame(int partition, ByteBuffer buffer); - void closeInput(int partition) throws HyracksDataException; - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java index cbe1a66..c5f612f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java @@ -151,7 +151,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { } try { state.joiner.setFrame(LEFT_ACTIVITY_ID, buffer); - state.joiner.processMergeUsingLeftTuple(writer); + state.joiner.processLeftFrame(writer); } finally { locks.getLock(partition).unlock(); } @@ -175,9 +175,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor { if (state.failed) { writer.fail(); } else { - state.joiner.closeInput(LEFT_ACTIVITY_ID); - state.joiner.processMergeUsingLeftTuple(writer); - state.joiner.closeResult(writer); + state.joiner.processLeftClose(writer); writer.close(); } state.status.branch[LEFT_ACTIVITY_ID].setStageClose(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ce593414/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java index 9c93828..d4195ec 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoiner.java @@ -110,17 +110,6 @@ public class MergeJoiner extends AbstractMergeJoiner { joinResultCount++; } - @Override - public void closeResult(IFrameWriter writer) throws HyracksDataException { - resultAppender.write(writer, true); - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount - + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, " - + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount() - + " spill frames read."); - } - } - private void flushMemory() throws HyracksDataException { memoryBuffer.clear(); bufferManager.reset(); @@ -176,20 +165,13 @@ public class MergeJoiner extends AbstractMergeJoiner { return TupleStatus.LOADED; } - @Override - public void closeInput(int partition) throws HyracksDataException { - if (status.branch[partition].isRunFileWriting()) { - unfreezeAndContinue(inputAccessor[partition]); - } - } - /** * Left * * @throws HyracksDataException */ @Override - public void processMergeUsingLeftTuple(IFrameWriter writer) throws HyracksDataException { + public void processLeftFrame(IFrameWriter writer) throws HyracksDataException { TupleStatus leftTs = loadLeftTuple(); TupleStatus rightTs = loadRightTuple(); while (leftTs.isLoaded() && (status.branch[RIGHT_PARTITION].hasMore() || memoryHasTuples())) { @@ -209,6 +191,21 @@ public class MergeJoiner extends AbstractMergeJoiner { } } + @Override + public void processLeftClose(IFrameWriter writer) throws HyracksDataException { + if (status.branch[LEFT_PARTITION].isRunFileWriting()) { + unfreezeAndContinue(inputAccessor[LEFT_PARTITION]); + } + processLeftFrame(writer); + resultAppender.write(writer, true); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("MergeJoiner statitics: " + joinComparisonCount + " comparisons, " + joinResultCount + + " results, " + spillCount + " spills, " + runFileStream.getFileCount() + " files, " + + runFileStream.getWriteCount() + " spill frames written, " + runFileStream.getReadCount() + + " spill frames read."); + } + } + private TupleStatus processLeftTupleSpill(IFrameWriter writer) throws HyracksDataException { // System.err.print("Spill ");