Finalize the range in context.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/13af53a7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/13af53a7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/13af53a7

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 13af53a7b26c421e58848f6614b2cd0534cd72f4
Parents: 2061a38
Author: Preston Carman <prest...@apache.org>
Authored: Mon Aug 15 14:30:00 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Mon Aug 15 14:30:00 2016 -0700

----------------------------------------------------------------------
 .../physical/AbstractIntervalJoinPOperator.java | 19 +++++++++++----
 .../IntervalLocalRangeOperatorDescriptor.java   |  2 +-
 .../rules/IntervalSplitPartitioningRule.java    |  8 ++++---
 .../asterix/optimizer/rules/util/JoinUtils.java | 25 +++++++++++---------
 .../AfterIntervalMergeJoinCheckerFactory.java   |  4 ++--
 .../BeforeIntervalMergeJoinCheckerFactory.java  |  4 ++--
 ...overedByIntervalMergeJoinCheckerFactory.java |  4 ++--
 .../CoversIntervalMergeJoinCheckerFactory.java  |  4 ++--
 .../EndedByIntervalMergeJoinCheckerFactory.java |  4 ++--
 .../EndsIntervalMergeJoinCheckerFactory.java    |  4 ++--
 .../joins/IIntervalMergeJoinCheckerFactory.java |  4 ++--
 .../MeetsIntervalMergeJoinCheckerFactory.java   |  4 ++--
 .../MetByIntervalMergeJoinCheckerFactory.java   |  4 ++--
 ...lappedByIntervalMergeJoinCheckerFactory.java |  4 ++--
 ...rlappingIntervalMergeJoinCheckerFactory.java | 15 ++++++++++--
 ...OverlapsIntervalMergeJoinCheckerFactory.java |  4 ++--
 ...tartedByIntervalMergeJoinCheckerFactory.java |  4 ++--
 .../StartsIntervalMergeJoinCheckerFactory.java  |  4 ++--
 .../intervalindex/IntervalIndexJoiner.java      |  2 +-
 ...IntervalPartitionJoinOperatorDescriptor.java | 16 +++++++------
 .../operators/physical/MergeJoinPOperator.java  | 15 +++++++++---
 .../properties/OrderedPartitionedProperty.java  |  6 +++++
 .../algebra/util/OperatorPropertiesUtil.java    | 15 +++++++-----
 .../rules/EnforceStructuralPropertiesRule.java  |  4 +---
 .../hyracks/dataflow/std/base/RangeId.java      | 25 ++++++++++++++++++--
 .../connectors/PartitionRangeDataWriter.java    |  2 +-
 .../std/join/IMergeJoinCheckerFactory.java      |  4 ++--
 .../std/join/MergeJoinOperatorDescriptor.java   |  2 +-
 .../join/NaturalMergeJoinCheckerFactory.java    |  4 ++--
 .../misc/RangeForwardOperatorDescriptor.java    |  3 ++-
 .../std/sort/AbstractExternalSortRunMerger.java | 24 +++++++++----------
 31 files changed, 155 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
index c400cdf..3be9e80 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
@@ -84,10 +84,18 @@ public abstract class AbstractIntervalJoinPOperator extends 
AbstractJoinPOperato
         return mjcf;
     }
 
-    public RangeId getRangeId() {
+    public RangeId getLeftRangeId() {
         return leftRangeId;
     }
 
+    public RangeId getRightRangeId() {
+        return rightRangeId;
+    }
+
+    public IRangeMap getRangeMapHint() {
+        return rangeMapHint;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.EXTENSION_OPERATOR;
@@ -111,7 +119,8 @@ public abstract class AbstractIntervalJoinPOperator extends 
AbstractJoinPOperato
         for (LogicalVariable v : keysLeftBranch) {
             order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : 
OrderKind.DESC));
         }
-        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, 
leftRangeId, RangePartitioningType.PROJECT, rangeMapHint);
+        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, 
leftRangeId,
+                RangePartitioningType.PROJECT, rangeMapHint);
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
         propsLocal.add(new LocalOrderProperty(order));
         deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -141,8 +150,10 @@ public abstract class AbstractIntervalJoinPOperator 
extends AbstractJoinPOperato
         ispRight.add(new LocalOrderProperty(orderRight));
 
         if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            ppLeft = new OrderedPartitionedProperty(orderLeft, null, 
leftRangeId, mjcf.getLeftPartitioningType(), rangeMapHint);
-            ppRight = new OrderedPartitionedProperty(orderRight, null, 
rightRangeId, mjcf.getRightPartitioningType(), rangeMapHint);
+            ppLeft = new OrderedPartitionedProperty(orderLeft, null, 
leftRangeId, mjcf.getLeftPartitioningType(),
+                    rangeMapHint);
+            ppRight = new OrderedPartitionedProperty(orderRight, null, 
rightRangeId, mjcf.getRightPartitioningType(),
+                    rangeMapHint);
         }
 
         pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index 584e30f..f24dc7c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -154,7 +154,7 @@ public class IntervalLocalRangeOperatorDescriptor extends 
AbstractOperatorDescri
                         writers[i].open();
                         resultAppender[i] = new FrameTupleAppender(new 
VSizeFrame(ctx), true);
                     }
-                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(rangeId);
+                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
                     IRangeMap rangeMap = rangeState.getRangeMap();
                     nodeRangeStart = getPartitionBoundryStart(rangeMap);
                     nodeRangeEnd = getPartitionBoundryEnd(rangeMap);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index eafc2bb..f0ff610 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -418,14 +418,15 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
             MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo;
             MergeJoinPOperator mjpoClone = new 
MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(),
                     mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), 
memoryJoinSize,
-                    mjpo.getMergeJoinCheckerFactory(), mjpo.getRangeId(), 
null);
+                    mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), 
mjpo.getRightRangeId(), null);
             ijoClone.setPhysicalOperator(mjpoClone);
         } else if (joinPo.getOperatorTag() == 
PhysicalOperatorTag.EXTENSION_OPERATOR) {
             if (joinPo instanceof IntervalIndexJoinPOperator) {
                 IntervalIndexJoinPOperator iijpo = 
(IntervalIndexJoinPOperator) joinPo;
                 IntervalIndexJoinPOperator iijpoClone = new 
IntervalIndexJoinPOperator(iijpo.getKind(),
                         iijpo.getPartitioningType(), 
iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(),
-                        memoryJoinSize, 
iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getRangeId(), null);
+                        memoryJoinSize, 
iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getLeftRangeId(),
+                        iijpo.getRightRangeId(), null);
                 ijoClone.setPhysicalOperator(iijpoClone);
             } else if (joinPo instanceof IntervalPartitionJoinPOperator) {
                 IntervalPartitionJoinPOperator ipjpo = 
(IntervalPartitionJoinPOperator) joinPo;
@@ -433,7 +434,8 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
                         ipjpo.getPartitioningType(), 
ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(),
                         memoryJoinSize, ipjpo.getBuildTupleCount(), 
ipjpo.getProbeTupleCount(),
                         ipjpo.getBuildMaxDuration(), 
ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(),
-                        ipjpo.getIntervalMergeJoinCheckerFactory(), 
ipjpo.getRangeId(), null);
+                        ipjpo.getIntervalMergeJoinCheckerFactory(), 
ipjpo.getLeftRangeId(), ipjpo.getRightRangeId(),
+                        null);
                 ijoClone.setPhysicalOperator(iijpoClone);
             } else {
                 return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index 2707403..795ee82 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -55,7 +55,6 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
 import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
@@ -142,9 +141,10 @@ public class JoinUtils {
     private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator 
op, FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IntervalJoinExpressionAnnotation ijea,
             IOptimizationContext context) {
-        IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi);
+        RangeId leftRangeId = context.newRangeId();
+        IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, 
leftRangeId);
         op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST, sideLeft,
-                sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, 
context.newRangeId(),
+                sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, 
leftRangeId,
                 context.newRangeId(), ijea.getRangeMap()));
     }
 
@@ -161,20 +161,22 @@ public class JoinUtils {
         int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? 
ijea.getTuplesPerFrame()
                 : 
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame();
 
-        IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi);
+        RangeId leftRangeId = context.newRangeId();
+        IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
         op.setPhysicalOperator(new 
IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
                 sideLeft, sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
-                rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, 
mjcf, context.newRangeId(),
-                context.newRangeId(), ijea.getRangeMap()));
+                rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, 
mjcf, leftRangeId, context.newRangeId(),
+                ijea.getRangeMap()));
     }
 
     private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, 
FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IntervalJoinExpressionAnnotation ijea,
             IOptimizationContext context) {
-        IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi);
+        RangeId leftRangeId = context.newRangeId();
+        IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
         op.setPhysicalOperator(new 
IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
-                sideLeft, sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf,
-                context.newRangeId(), context.newRangeId(), 
ijea.getRangeMap()));
+                sideLeft, sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, 
leftRangeId,
+                context.newRangeId(), ijea.getRangeMap()));
     }
 
     private static int getMaxDuration(List<LogicalVariable> lv, 
IOptimizationContext context) {
@@ -230,8 +232,9 @@ public class JoinUtils {
         }
     }
 
-    private static IIntervalMergeJoinCheckerFactory 
getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi) {
-        IIntervalMergeJoinCheckerFactory mjcf = new 
OverlappingIntervalMergeJoinCheckerFactory();
+    private static IIntervalMergeJoinCheckerFactory 
getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi,
+            RangeId rangeId) {
+        IIntervalMergeJoinCheckerFactory mjcf = new 
OverlappingIntervalMergeJoinCheckerFactory(rangeId);
         if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY)) {
             mjcf = new OverlappedByIntervalMergeJoinCheckerFactory();
         } else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
index 572241c..09b3020 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/AfterIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class AfterIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new AfterIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
index ff5acf2..3e913d2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/BeforeIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class BeforeIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new BeforeIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
index 64b0c2a..a513cbc 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoveredByIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class CoveredByIntervalMergeJoinCheckerFactory extends 
AbstractIntervalInverseMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new CoveredByIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
index dc50451..cc9b37d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/CoversIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class CoversIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new CoversIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
index 68e2922..c3a681c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndedByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class EndedByIntervalMergeJoinCheckerFactory extends 
AbstractIntervalInverseMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new EndedByIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
index e5b7be0..295bd04 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/EndsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class EndsIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new EndsIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
index e4ceeb1..f2e3d80 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/IIntervalMergeJoinCheckerFactory.java
@@ -20,14 +20,14 @@ package org.apache.asterix.runtime.operators.joins;
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
 
 public interface IIntervalMergeJoinCheckerFactory extends 
IMergeJoinCheckerFactory, Serializable {
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap)
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx)
             throws HyracksDataException;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
index 038f9ef..c970fd2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MeetsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class MeetsIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new MeetsIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
index 6c3fe32..fad2d88 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/MetByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class MetByIntervalMergeJoinCheckerFactory extends 
AbstractIntervalInverseMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new MetByIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
index 8031181..c47381a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappedByIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class OverlappedByIntervalMergeJoinCheckerFactory extends 
AbstractIntervalInverseMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new OverlappedByIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
index 195a85f..a5f7770 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
@@ -19,18 +19,29 @@
 package org.apache.asterix.runtime.operators.joins;
 
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import 
org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
 
 public class OverlappingIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
+    private final RangeId rangeId;
+
+    public OverlappingIntervalMergeJoinCheckerFactory(RangeId rangeId) {
+        this.rangeId = rangeId;
+    }
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap)
-            throws HyracksDataException {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition,
+            IHyracksTaskContext ctx) throws HyracksDataException {
         int fieldIndex = 0;
+        RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
+                .getStateObject(new RangeId(rangeId.getId(), ctx));
+        IRangeMap rangeMap = rangeState.getRangeMap();
         if (ATypeTag.INT64.serialize() != rangeMap.getTag(0, 0)) {
             throw new HyracksDataException("Invalid range map type for 
interval merge join checker.");
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
index e3ecf2e..0cf3ac1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlapsIntervalMergeJoinCheckerFactory.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class OverlapsIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new OverlapsIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
index 431aa8e..0938fe2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartedByIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class StartedByIntervalMergeJoinCheckerFactory extends 
AbstractIntervalInverseMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new StartedByIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
index a05615c..924b442 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/StartsIntervalMergeJoinCheckerFactory.java
@@ -18,14 +18,14 @@
  */
 package org.apache.asterix.runtime.operators.joins;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public class StartsIntervalMergeJoinCheckerFactory extends 
AbstractIntervalMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IRangeMap rangeMap) {
+    public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition, IHyracksTaskContext ctx) {
         return new StartsIntervalMergeJoinChecker(keys0, keys1);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index 9ca536b..6f04cad 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         super(ctx, partition, status, locks, leftRd, rightRd);
         this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : 
EndPointIndexItem.END_POINT;
 
-        this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, 
partition, null);
+        this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, 
partition, ctx);
 
         this.leftKey = leftKeys[0];
         this.rightKey = rightKeys[0];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 8c4c43d..4e1850c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -160,18 +160,20 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
                         }
                     }
 
-                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(rangeId);
-                    long partitionStart = 
IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition);
+                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx
+                            .getStateObject(new RangeId(rangeId.getId(), ctx));
+                    long partitionStart = 
IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
+                            partition);
                     long partitionEnd = 
IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
-                    ITuplePartitionComputer buildHpc = new 
IntervalPartitionComputerFactory(buildKey, k, partitionStart,
-                            partitionEnd).createPartitioner();
-                    ITuplePartitionComputer probeHpc = new 
IntervalPartitionComputerFactory(probeKey, k, partitionStart,
-                            partitionEnd).createPartitioner();
+                    ITuplePartitionComputer buildHpc = new 
IntervalPartitionComputerFactory(buildKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
+                    ITuplePartitionComputer probeHpc = new 
IntervalPartitionComputerFactory(probeKey, state.k,
+                            partitionStart, partitionEnd).createPartitioner();
 
                     state.partition = partition;
                     state.intervalPartitions = 
IntervalPartitionUtil.getMaxPartitions(state.k);
                     state.memoryForJoin = memsize;
-                    IIntervalMergeJoinChecker imjc = 
imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, 
rangeState.getRangeMap());
+                    IIntervalMergeJoinChecker imjc = 
imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, ctx);
                     state.ipj = new IntervalPartitionJoiner(ctx, 
state.memoryForJoin, state.k, state.intervalPartitions,
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, 
buildHpc, probeHpc);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
index c24f3c8..7fa7fdf 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
@@ -93,10 +93,18 @@ public class MergeJoinPOperator extends 
AbstractJoinPOperator {
         return mjcf;
     }
 
-    public RangeId getRangeId() {
+    public RangeId getLeftRangeId() {
         return leftRangeId;
     }
 
+    public RangeId getRightRangeId() {
+        return rightRangeId;
+    }
+
+    public IRangeMap getRangeMapHint() {
+        return rangeMapHint;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.MERGE_JOIN;
@@ -113,8 +121,8 @@ public class MergeJoinPOperator extends 
AbstractJoinPOperator {
         for (LogicalVariable v : keysLeftBranch) {
             order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : 
OrderKind.DESC));
         }
-        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, 
leftRangeId, RangePartitioningType.PROJECT,
-                rangeMapHint);
+        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, 
leftRangeId,
+                RangePartitioningType.PROJECT, rangeMapHint);
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
         propsLocal.add(new LocalOrderProperty(order));
         deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -176,4 +184,5 @@ public class MergeJoinPOperator extends 
AbstractJoinPOperator {
         ILogicalOperator src2 = op.getInputs().get(1).getValue();
         builder.contributeGraphEdge(src2, 0, op, 1);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 040e663..92d4098 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -35,6 +35,7 @@ public class OrderedPartitionedProperty implements 
IPartitioningProperty {
     private INodeDomain domain;
     private RangeId rangeId;
     private RangePartitioningType rangeType;
+    private IRangeMap rangeMapHint;
 
     public OrderedPartitionedProperty(List<OrderColumn> orderColumns, 
INodeDomain domain, RangeId rangeId,
             RangePartitioningType rangeType, IRangeMap rangeMapHint) {
@@ -42,6 +43,7 @@ public class OrderedPartitionedProperty implements 
IPartitioningProperty {
         this.orderColumns = orderColumns;
         this.rangeId = rangeId;
         this.rangeType = rangeType;
+        this.rangeMapHint = rangeMapHint;
     }
 
     public OrderedPartitionedProperty(List<OrderColumn> orderColumns, 
INodeDomain domain, RangeId rangeId) {
@@ -93,6 +95,10 @@ public class OrderedPartitionedProperty implements 
IPartitioningProperty {
         return rangeId;
     }
 
+    public IRangeMap getRangeMapHint() {
+        return rangeMapHint;
+    }
+
     @Override
     public INodeDomain getNodeDomain() {
         return domain;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 353a782..fd340d2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -45,6 +45,9 @@ public class OperatorPropertiesUtil {
 
     private static final String MOVABLE = "isMovable";
 
+    private OperatorPropertiesUtil() {
+    }
+
     public static <T> boolean disjoint(Collection<T> c1, Collection<T> c2) {
         for (T m : c1) {
             if (c2.contains(m)) {
@@ -58,7 +61,7 @@ public class OperatorPropertiesUtil {
     private static void getFreeVariablesInOp(ILogicalOperator op, 
Set<LogicalVariable> freeVars)
             throws AlgebricksException {
         VariableUtilities.getUsedVariables(op, freeVars);
-        HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> produced = new HashSet<>();
         VariableUtilities.getProducedVariables(op, produced);
         for (LogicalVariable v : produced) {
             freeVars.remove(v);
@@ -75,13 +78,13 @@ public class OperatorPropertiesUtil {
      */
     public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator 
op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
-        HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> produced = new HashSet<>();
         VariableUtilities.getProducedVariables(op, produced);
         for (LogicalVariable v : produced) {
             freeVars.remove(v);
         }
 
-        HashSet<LogicalVariable> used = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> used = new HashSet<>();
         VariableUtilities.getUsedVariables(op, used);
         for (LogicalVariable v : used) {
             freeVars.add(v);
@@ -108,7 +111,7 @@ public class OperatorPropertiesUtil {
      */
     public static void getFreeVariablesInPath(ILogicalOperator op, 
ILogicalOperator dest, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
-        Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+        Set<LogicalVariable> producedVars = new ListSet<>();
         VariableUtilities.getLiveVariables(op, freeVars);
         collectUsedAndProducedVariablesInPath(op, dest, freeVars, 
producedVars);
         freeVars.removeAll(producedVars);
@@ -163,13 +166,13 @@ public class OperatorPropertiesUtil {
     }
 
     public static boolean hasFreeVariablesInSelfOrDesc(AbstractLogicalOperator 
op) throws AlgebricksException {
-        HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> free = new HashSet<>();
         getFreeVariablesInSelfOrDesc(op, free);
         return !free.isEmpty();
     }
 
     public static boolean hasFreeVariables(ILogicalOperator op) throws 
AlgebricksException {
-        HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> free = new HashSet<>();
         getFreeVariablesInOp(op, free);
         return !free.isEmpty();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 4ec5e27..f3adffd 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -586,9 +586,7 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
                     List<ILocalStructuralProperty> reqdLocals = 
required.getLocalProperties();
 
                     // Add RangeForwardOperator.
-                    IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
-                            .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
-                    addRangeForwardOperator(op.getInputs().get(i), 
opp.getRangeId(), rangeMap, context);
+                    addRangeForwardOperator(op.getInputs().get(i), 
opp.getRangeId(), opp.getRangeMapHint(), context);
 
                     boolean propWasSet = false;
                     pop = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
index befaad9..774dd2a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -20,12 +20,25 @@ package org.apache.hyracks.dataflow.std.base;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
 /**
  * Represents a range id in a logical plan.
  */
 public final class RangeId implements Serializable {
     private static final long serialVersionUID = 1L;
     private final int id;
+    private int partition = -1;
+
+    public RangeId(int id, int partition) {
+        this.id = id;
+        this.partition = partition;
+    }
+
+    public RangeId(int id, IHyracksTaskContext ctx) {
+        this.id = id;
+        this.partition = ctx.getTaskAttemptId().getTaskId().getPartition();
+    }
 
     public RangeId(int id) {
         this.id = id;
@@ -35,9 +48,17 @@ public final class RangeId implements Serializable {
         return id;
     }
 
+    public int getPartition() {
+        return partition;
+    }
+
+    public void setPartition(int partition) {
+        this.partition = partition;
+    }
+
     @Override
     public String toString() {
-        return "RangeId(#" + id + ")";
+        return "RangeId(#" + id + (partition >= 0 ? "," + partition : "") + 
")";
     }
 
     @Override
@@ -45,7 +66,7 @@ public final class RangeId implements Serializable {
         if (!(obj instanceof RangeId)) {
             return false;
         } else {
-            return id == ((RangeId) obj).getId();
+            return id == ((RangeId) obj).getId() && partition == ((RangeId) 
obj).getPartition();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index 2740a60..c08035a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -50,7 +50,7 @@ public class PartitionRangeDataWriter extends 
AbstractPartitionDataWriter {
     @Override
     public void open() throws HyracksDataException {
         super.open();
-        RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(rangeId);
+        RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
         tpc = trpcf.createPartitioner(rangeState.getRangeMap());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
index 850bf56..d7ac550 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
@@ -20,13 +20,13 @@ package org.apache.hyracks.dataflow.std.join;
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IMergeJoinCheckerFactory extends Serializable {
 
-    IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int 
partition, IRangeMap rangeMap) throws HyracksDataException;
+    IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int 
partition, IHyracksTaskContext ctx) throws HyracksDataException;
 
     RangePartitioningType getLeftPartitioningType();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 6f0b33b..5624bb5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends 
AbstractOperatorDescriptor {
             locks.setPartitions(nPartitions);
             RecordDescriptor inRecordDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IMergeJoinChecker mjc = 
mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
-                    partition, null);
+                    partition, ctx);
             return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
index 15df580..abdadb6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
@@ -18,9 +18,9 @@
  */
 package org.apache.hyracks.dataflow.std.join;
 
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import 
org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
@@ -33,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements 
IMergeJoinCheckerFactory
     }
 
     @Override
-    public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, 
int partition, IRangeMap rangeMap) {
+    public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, 
int partition, IHyracksTaskContext ctx) {
         final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 067246d..04cfca3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -90,7 +90,8 @@ public class RangeForwardOperatorDescriptor extends 
AbstractOperatorDescriptor {
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new 
RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap);
+                    state = new 
RangeForwardTaskState(ctx.getJobletContext().getJobId(),
+                            new RangeId(rangeId.getId(), ctx), rangeMap);
                     ctx.setStateObject(state);
                     writer.open();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/13af53a7/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index e032e6a..6d9d085 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -49,7 +49,6 @@ public abstract class AbstractExternalSortRunMerger {
     private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final int framesLimit;
-    private final int MAX_FRAME_SIZE;
     private final int topK;
     private List<GroupVSizeFrame> inFrames;
     private VSizeFrame outputFrame;
@@ -75,14 +74,13 @@ public abstract class AbstractExternalSortRunMerger {
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
-        this.MAX_FRAME_SIZE = FrameConstants.MAX_FRAMESIZE;
         this.topK = topK;
     }
 
     public void process() throws HyracksDataException {
         IFrameWriter finalWriter = null;
         try {
-            if (runs.size() <= 0) {
+            if (runs.isEmpty()) {
                 finalWriter = prepareSkipMergingFinalResultWriter(writer);
                 finalWriter.open();
                 if (sorter != null) {
@@ -169,9 +167,10 @@ public abstract class AbstractExternalSortRunMerger {
         }
     }
 
-    private static int selectPartialRuns(int budget, 
List<GeneratedRunFileReader> runs,
+    private static int selectPartialRuns(int argBudget, 
List<GeneratedRunFileReader> runs,
             List<GeneratedRunFileReader> partialRuns, BitSet runAvailable, int 
stop) {
         partialRuns.clear();
+        int budget = argBudget;
         int maxFrameSizeOfGenRun = 0;
         int nextRunId = runAvailable.nextSetBit(0);
         while (budget > 0 && nextRunId >= 0 && nextRunId < stop) {
@@ -192,13 +191,14 @@ public abstract class AbstractExternalSortRunMerger {
         if (extraFreeMem > 0 && partialRuns.size() > 1) {
             int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
             int avg = (extraFrames / partialRuns.size()) * 
ctx.getInitialFrameSize();
-            int residue = (extraFrames % partialRuns.size());
+            int residue = extraFrames % partialRuns.size();
             for (int i = 0; i < residue; i++) {
-                partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE,
+                
partialRuns.get(i).updateSize(Math.min(FrameConstants.MAX_FRAMESIZE,
                         partialRuns.get(i).getMaxFrameSize() + avg + 
ctx.getInitialFrameSize()));
             }
             for (int i = residue; i < partialRuns.size() && avg > 0; i++) {
-                partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, 
partialRuns.get(i).getMaxFrameSize() + avg));
+                partialRuns.get(i)
+                        .updateSize(Math.min(FrameConstants.MAX_FRAMESIZE, 
partialRuns.get(i).getMaxFrameSize() + avg));
             }
         }
 
@@ -214,17 +214,17 @@ public abstract class AbstractExternalSortRunMerger {
         }
     }
 
-    abstract protected IFrameWriter 
prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+    protected abstract IFrameWriter 
prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
             throws HyracksDataException;
 
-    abstract protected RunFileWriter prepareIntermediateMergeRunFile() throws 
HyracksDataException;
+    protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws 
HyracksDataException;
 
-    abstract protected IFrameWriter 
prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+    protected abstract IFrameWriter 
prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
             throws HyracksDataException;
 
-    abstract protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter 
nextWriter) throws HyracksDataException;
+    protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter 
nextWriter) throws HyracksDataException;
 
-    abstract protected int[] getSortFields();
+    protected abstract int[] getSortFields();
 
     private void merge(IFrameWriter writer, List<GeneratedRunFileReader> 
partialRuns) throws HyracksDataException {
         RunMergingFrameReader merger = new RunMergingFrameReader(ctx, 
partialRuns, inFrames, getSortFields(),

Reply via email to