Working version of the range connector and interval join partition.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/1487f2be
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/1487f2be
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/1487f2be

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 1487f2be3a364e32e3fee7a10246b6f548f75476
Parents: 13af53a
Author: Preston Carman <prest...@apache.org>
Authored: Wed Aug 24 14:35:57 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Wed Aug 24 14:35:57 2016 -0700

----------------------------------------------------------------------
 .../physical/AbstractIntervalJoinPOperator.java |  56 +++++---
 .../IntervalLocalRangeOperatorDescriptor.java   |   2 +-
 .../IntervalPartitionJoinPOperator.java         |  86 +++++++-----
 .../asterix/optimizer/base/RuleCollections.java |   2 +-
 .../rules/IntervalSplitPartitioningRule.java    |  61 ++++----
 .../asterix/optimizer/rules/util/JoinUtils.java | 128 ++++++++++++++---
 .../translator/util/FunctionCollection.java     |   6 +
 .../config/AsterixPropertiesAccessor.java       |   6 +-
 .../om/functions/AsterixBuiltinFunctions.java   |   6 +
 .../evaluators/common/FunctionManagerImpl.java  |   6 +-
 .../CalendarDurationFromDateTimeDescriptor.java |   6 +-
 .../IntervalPartitionJoinEndDescriptor.java     |  58 ++++++++
 .../temporal/IntervalPartitionJoinFunction.java | 138 +++++++++++++++++++
 .../IntervalPartitionJoinStartDescriptor.java   |  58 ++++++++
 ...rlappingIntervalMergeJoinCheckerFactory.java |   3 +-
 .../IntervalPartitionComputerFactory.java       |  24 +---
 ...IntervalPartitionJoinOperatorDescriptor.java |  28 +---
 .../IntervalPartitionUtil.java                  |  22 +++
 .../algebra/functions/FunctionIdentifier.java   |   3 +-
 .../logical/visitors/SchemaVariableVisitor.java |   1 +
 .../rules/EnforceStructuralPropertiesRule.java  |   3 +-
 .../rewriter/rules/IntroduceProjectsRule.java   |  18 +--
 .../SetAlgebricksPhysicalOperatorsRule.java     |   7 +
 .../hyracks/dataflow/std/base/RangeId.java      |   3 +-
 .../connectors/PartitionRangeDataWriter.java    |   2 +-
 .../misc/RangeForwardOperatorDescriptor.java    |  10 ++
 26 files changed, 573 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
index 3be9e80..cc2a022 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
@@ -55,7 +55,7 @@ public abstract class AbstractIntervalJoinPOperator extends 
AbstractJoinPOperato
 
     private final List<LogicalVariable> keysLeftBranch;
     private final List<LogicalVariable> keysRightBranch;
-    private final IIntervalMergeJoinCheckerFactory mjcf;
+    protected final IIntervalMergeJoinCheckerFactory mjcf;
     private final RangeId leftRangeId;
     private final RangeId rightRangeId;
     private final IRangeMap rangeMapHint;
@@ -115,14 +115,11 @@ public abstract class AbstractIntervalJoinPOperator 
extends AbstractJoinPOperato
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator iop, 
IOptimizationContext context) {
-        ArrayList<OrderColumn> order = new ArrayList<>();
-        for (LogicalVariable v : keysLeftBranch) {
-            order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : 
OrderKind.DESC));
-        }
+        ArrayList<OrderColumn> order = getLeftRangeOrderColumn();
         IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, 
leftRangeId,
                 RangePartitioningType.PROJECT, rangeMapHint);
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
-        propsLocal.add(new LocalOrderProperty(order));
+        propsLocal.add(new LocalOrderProperty(getLeftLocalSortOrderColumn()));
         deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
     }
 
@@ -134,26 +131,17 @@ public abstract class AbstractIntervalJoinPOperator 
extends AbstractJoinPOperato
 
         IPartitioningProperty ppLeft = null;
         List<ILocalStructuralProperty> ispLeft = new ArrayList<>();
+        ispLeft.add(new LocalOrderProperty(getLeftLocalSortOrderColumn()));
+
         IPartitioningProperty ppRight = null;
         List<ILocalStructuralProperty> ispRight = new ArrayList<>();
-
-        ArrayList<OrderColumn> orderLeft = new ArrayList<>();
-        for (LogicalVariable v : keysLeftBranch) {
-            orderLeft.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC 
: OrderKind.DESC));
-        }
-        ispLeft.add(new LocalOrderProperty(orderLeft));
-
-        ArrayList<OrderColumn> orderRight = new ArrayList<>();
-        for (LogicalVariable v : keysRightBranch) {
-            orderRight.add(new OrderColumn(v, mjcf.isOrderAsc() ? 
OrderKind.ASC : OrderKind.DESC));
-        }
-        ispRight.add(new LocalOrderProperty(orderRight));
+        ispRight.add(new LocalOrderProperty(getRightLocalSortOrderColumn()));
 
         if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            ppLeft = new OrderedPartitionedProperty(orderLeft, null, 
leftRangeId, mjcf.getLeftPartitioningType(),
-                    rangeMapHint);
-            ppRight = new OrderedPartitionedProperty(orderRight, null, 
rightRangeId, mjcf.getRightPartitioningType(),
-                    rangeMapHint);
+            ppLeft = new OrderedPartitionedProperty(getLeftRangeOrderColumn(), 
null, leftRangeId,
+                    mjcf.getLeftPartitioningType(), rangeMapHint);
+            ppRight = new 
OrderedPartitionedProperty(getRightRangeOrderColumn(), null, rightRangeId,
+                    mjcf.getRightPartitioningType(), rangeMapHint);
         }
 
         pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);
@@ -162,6 +150,30 @@ public abstract class AbstractIntervalJoinPOperator 
extends AbstractJoinPOperato
         return new PhysicalRequirements(pv, prc);
     }
 
+    protected ArrayList<OrderColumn> getLeftLocalSortOrderColumn() {
+        return getLeftRangeOrderColumn();
+    }
+
+    protected ArrayList<OrderColumn> getRightLocalSortOrderColumn() {
+        return getRightRangeOrderColumn();
+    }
+
+    protected ArrayList<OrderColumn> getLeftRangeOrderColumn() {
+        ArrayList<OrderColumn> order = new ArrayList<>();
+        for (LogicalVariable v : keysLeftBranch) {
+            order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : 
OrderKind.DESC));
+        }
+        return order;
+    }
+
+    protected ArrayList<OrderColumn> getRightRangeOrderColumn() {
+        ArrayList<OrderColumn> orderRight = new ArrayList<>();
+        for (LogicalVariable v : keysRightBranch) {
+            orderRight.add(new OrderColumn(v, mjcf.isOrderAsc() ? 
OrderKind.ASC : OrderKind.DESC));
+        }
+        return orderRight;
+    }
+
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
index f24dc7c..cf8ad89 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -154,7 +154,7 @@ public class IntervalLocalRangeOperatorDescriptor extends 
AbstractOperatorDescri
                         writers[i].open();
                         resultAppender[i] = new FrameTupleAppender(new 
VSizeFrame(ctx), true);
                     }
-                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx.getStateObject(new RangeId(rangeId.getId(), ctx));
+                    RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
                     IRangeMap rangeMap = rangeState.getRangeMap();
                     nodeRangeStart = getPartitionBoundryStart(rangeMap);
                     nodeRangeEnd = getPartitionBoundryEnd(rangeMap);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index ca9dd69..73d159e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.algebra.operators.physical;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -25,6 +26,8 @@ import 
org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFacto
 import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionJoinOperatorDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,57 +35,45 @@ import 
org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.RangeId;
 
 public class IntervalPartitionJoinPOperator extends 
AbstractIntervalJoinPOperator {
+    private static final int START = 0;
+    private static final int END = 1;
 
     private final int memSizeInFrames;
-    private final long probeTupleCount;
-    private final long probeMaxDuration;
-    private final long buildTupleCount;
-    private final long buildMaxDuration;
-    private final int avgTuplesInFrame;
+    private final int k;
+    private final List<LogicalVariable> leftPartitionVar;
+    private final List<LogicalVariable> rightPartitionVar;
 
     private static final Logger LOGGER = 
Logger.getLogger(IntervalPartitionJoinPOperator.class.getName());
 
     public IntervalPartitionJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> 
sideRightOfEqualities,
-            int memSizeInFrames, long buildTupleCount, long probeTupleCount, 
long buildMaxDuration,
-            long probeMaxDuration, int avgTuplesInFrame, 
IIntervalMergeJoinCheckerFactory mjcf, RangeId leftRangeId,
-            RangeId rightRangeId, IRangeMap rangeMapHint) {
+            int memSizeInFrames, int k, IIntervalMergeJoinCheckerFactory mjcf, 
List<LogicalVariable> leftPartitionVar,
+            List<LogicalVariable> rightPartitionVar, RangeId leftRangeId, 
RangeId rightRangeId,
+            IRangeMap rangeMapHint) {
         super(kind, partitioningType, sideLeftOfEqualities, 
sideRightOfEqualities, mjcf, leftRangeId, rightRangeId,
                 rangeMapHint);
         this.memSizeInFrames = memSizeInFrames;
-        this.buildTupleCount = buildTupleCount;
-        this.probeTupleCount = probeTupleCount;
-        this.buildMaxDuration = buildMaxDuration;
-        this.probeMaxDuration = probeMaxDuration;
-        this.avgTuplesInFrame = avgTuplesInFrame;
+        this.k = k;
+        this.leftPartitionVar = leftPartitionVar;
+        this.rightPartitionVar = rightPartitionVar;
 
         LOGGER.fine("IntervalPartitionJoinPOperator constructed with: 
JoinKind=" + kind + ", JoinPartitioningType="
                 + partitioningType + ", List<LogicalVariable>=" + 
sideLeftOfEqualities + ", List<LogicalVariable>="
-                + sideRightOfEqualities + ", int memSizeInFrames=" + 
memSizeInFrames + ", int buildTupleCount="
-                + buildTupleCount + ", int probeTupleCount=" + probeTupleCount 
+ ", int buildMaxDuration="
-                + buildMaxDuration + ", int probeMaxDuration=" + 
probeMaxDuration + ", int avgTuplesInFrame="
-                + avgTuplesInFrame + ", IMergeJoinCheckerFactory mjcf=" + mjcf 
+ ", RangeId leftRangeId=" + leftRangeId
+                + sideRightOfEqualities + ", int memSizeInFrames=" + 
memSizeInFrames + ", int k=" + k
+                + ", IMergeJoinCheckerFactory mjcf=" + mjcf + ", RangeId 
leftRangeId=" + leftRangeId
                 + ", RangeId rightRangeId=" + rightRangeId + ".");
     }
 
-    public long getProbeTupleCount() {
-        return probeTupleCount;
+    public int getK() {
+        return k;
     }
 
-    public long getProbeMaxDuration() {
-        return probeMaxDuration;
+    public List<LogicalVariable> getLeftPartitionVar() {
+        return leftPartitionVar;
     }
 
-    public long getBuildTupleCount() {
-        return buildTupleCount;
-    }
-
-    public long getBuildMaxDuration() {
-        return buildMaxDuration;
-    }
-
-    public int getAvgTuplesInFrame() {
-        return avgTuplesInFrame;
+    public List<LogicalVariable> getRightPartitionVar() {
+        return rightPartitionVar;
     }
 
     @Override
@@ -93,9 +84,36 @@ public class IntervalPartitionJoinPOperator extends 
AbstractIntervalJoinPOperato
     @Override
     IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, int[] 
keysRight, IOperatorDescriptorRegistry spec,
             RecordDescriptor recordDescriptor, 
IIntervalMergeJoinCheckerFactory mjcf, RangeId rangeId) {
-        return new IntervalPartitionJoinOperatorDescriptor(spec, 
memSizeInFrames, buildTupleCount, probeTupleCount,
-                buildMaxDuration, probeMaxDuration, avgTuplesInFrame, 
keysLeft, keysRight, recordDescriptor, mjcf,
-                rangeId);
+        return new IntervalPartitionJoinOperatorDescriptor(spec, 
memSizeInFrames, k, keysLeft, keysRight,
+                recordDescriptor, mjcf, rangeId);
+    }
+
+    @Override
+    protected ArrayList<OrderColumn> getLeftLocalSortOrderColumn() {
+        ArrayList<OrderColumn> order = new ArrayList<>();
+        if (mjcf.isOrderAsc()) {
+            order.add(new OrderColumn(leftPartitionVar.get(END), 
OrderKind.ASC));
+            order.add(new OrderColumn(leftPartitionVar.get(START), 
OrderKind.DESC));
+        } else {
+            // TODO What does Desc'ing mean?
+            order.add(new OrderColumn(leftPartitionVar.get(START), 
OrderKind.ASC));
+            order.add(new OrderColumn(leftPartitionVar.get(END), 
OrderKind.DESC));
+        }
+        return order;
+    }
+
+    @Override
+    protected ArrayList<OrderColumn> getRightLocalSortOrderColumn() {
+        ArrayList<OrderColumn> order = new ArrayList<>();
+        if (mjcf.isOrderAsc()) {
+            order.add(new OrderColumn(rightPartitionVar.get(END), 
OrderKind.ASC));
+            order.add(new OrderColumn(rightPartitionVar.get(START), 
OrderKind.DESC));
+        } else {
+            // TODO What does Desc'ing mean?
+            order.add(new OrderColumn(rightPartitionVar.get(START), 
OrderKind.ASC));
+            order.add(new OrderColumn(rightPartitionVar.get(END), 
OrderKind.DESC));
+        }
+        return order;
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 521a80e..a4959ab 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -335,7 +335,7 @@ public final class RuleCollections {
         prepareForJobGenRewrites
                 .add(new 
IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
         prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
-        prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule());
+        //prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule());
         // Re-infer all types, so that, e.g., the effect of not-is-null is
         // propagated.
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
index f0ff610..595b994 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -45,6 +45,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -55,6 +56,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOpe
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MaterializePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
@@ -223,65 +225,65 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
 
         // Connect main path
         connectOperators(leftIntervalSplitRef, leftSortedInput, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplitRef.getValue());
+        updateOperatorContext(context, leftIntervalSplitRef);
         connectOperators(leftMaterialize0Ref, leftIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftMaterialize0Ref.getValue());
+        updateOperatorContext(context, leftMaterialize0Ref);
         connectOperators(leftMaterialize1Ref, leftIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftMaterialize1Ref.getValue());
+        updateOperatorContext(context, leftMaterialize1Ref);
         connectOperators(leftMaterialize2Ref, leftIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftMaterialize2Ref.getValue());
+        updateOperatorContext(context, leftMaterialize2Ref);
 
         connectOperators(leftStartsSplitRef, leftMaterialize0Ref, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftStartsSplitRef.getValue());
+        updateOperatorContext(context, leftStartsSplitRef);
 
         connectOperators(rightIntervalSplitRef, rightSortedInput, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplitRef.getValue());
+        updateOperatorContext(context, rightIntervalSplitRef);
         connectOperators(rightMaterialize0Ref, rightIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightMaterialize0Ref.getValue());
+        updateOperatorContext(context, rightMaterialize0Ref);
         connectOperators(rightMaterialize1Ref, rightIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightMaterialize1Ref.getValue());
+        updateOperatorContext(context, rightMaterialize1Ref);
         connectOperators(rightMaterialize2Ref, rightIntervalSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightMaterialize2Ref.getValue());
+        updateOperatorContext(context, rightMaterialize2Ref);
 
         connectOperators(rightStartsSplitRef, rightMaterialize0Ref, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightStartsSplitRef.getValue());
+        updateOperatorContext(context, rightStartsSplitRef);
 
         // Connect left and right starts path
         connectOperators(startsJoinRef, leftStartsSplitRef, context);
         connectOperators(startsJoinRef, rightStartsSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(startsJoinRef.getValue());
+        updateOperatorContext(context, startsJoinRef);
 
         // Connect left ends path
         connectOperators(leftEndsJoinRef, leftMaterialize1Ref, context);
         connectOperators(leftEndsJoinRef, rightStartsSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftEndsJoinRef.getValue());
+        updateOperatorContext(context, leftEndsJoinRef);
         connectOperators(union1Ref, startsJoinRef, context);
         connectOperators(union1Ref, leftEndsJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union1Ref.getValue());
+        updateOperatorContext(context, union1Ref);
 
         // Connect left covers path
         connectOperators(leftCoversJoinRef, leftMaterialize2Ref, context);
         connectOperators(leftCoversJoinRef, rightStartsSplitRef, context);
-        
context.computeAndSetTypeEnvironmentForOperator(leftCoversJoinRef.getValue());
+        updateOperatorContext(context, leftCoversJoinRef);
         connectOperators(union2Ref, union1Ref, context);
         connectOperators(union2Ref, leftCoversJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union2Ref.getValue());
+        updateOperatorContext(context, union2Ref);
 
         // Connect right ends path
         connectOperators(rightEndsJoinRef, leftStartsSplitRef, context);
         connectOperators(rightEndsJoinRef, rightMaterialize1Ref, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightEndsJoinRef.getValue());
+        updateOperatorContext(context, rightEndsJoinRef);
         connectOperators(union3Ref, union2Ref, context);
         connectOperators(union3Ref, rightEndsJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union3Ref.getValue());
+        updateOperatorContext(context, union3Ref);
 
         // Connect right covers path
         connectOperators(rightCoversJoinRef, leftStartsSplitRef, context);
         connectOperators(rightCoversJoinRef, rightMaterialize2Ref, context);
-        
context.computeAndSetTypeEnvironmentForOperator(rightCoversJoinRef.getValue());
+        updateOperatorContext(context, rightCoversJoinRef);
         connectOperators(union4Ref, union3Ref, context);
         connectOperators(union4Ref, rightCoversJoinRef, context);
-        context.computeAndSetTypeEnvironmentForOperator(union4Ref.getValue());
+        updateOperatorContext(context, union4Ref);
 
         // Update context
         opRef.setValue(union4Ref.getValue());
@@ -299,6 +301,14 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
         return true;
     }
 
+    private void updateOperatorContext(IOptimizationContext context, 
Mutable<ILogicalOperator> operatorRef)
+            throws AlgebricksException {
+//        operatorRef.getValue().recomputeSchema();
+//        operatorRef.getValue().computeDeliveredPhysicalProperties(context);
+        
context.computeAndSetTypeEnvironmentForOperator(operatorRef.getValue());
+        
OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator)
 operatorRef, context);
+    }
+
     private LogicalVariable getSortKey(ILogicalOperator op) {
         if (op.getOperatorTag() != LogicalOperatorTag.ORDER) {
             return null;
@@ -341,6 +351,7 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
             child.getValue().getInputs().add(eoRef);
             context.computeAndSetTypeEnvironmentForOperator(eo);
             context.computeAndSetTypeEnvironmentForOperator(child.getValue());
+            
OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator)
 eo, context);
         } else {
             if (parent.getValue().getOperatorTag() == 
LogicalOperatorTag.REPLICATE) {
                 ReplicateOperator ro = (ReplicateOperator) parent.getValue();
@@ -418,7 +429,8 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
             MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo;
             MergeJoinPOperator mjpoClone = new 
MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(),
                     mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), 
memoryJoinSize,
-                    mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), 
mjpo.getRightRangeId(), null);
+                    mjpo.getMergeJoinCheckerFactory(), mjpo.getLeftRangeId(), 
mjpo.getRightRangeId(),
+                    mjpo.getRangeMapHint());
             ijoClone.setPhysicalOperator(mjpoClone);
         } else if (joinPo.getOperatorTag() == 
PhysicalOperatorTag.EXTENSION_OPERATOR) {
             if (joinPo instanceof IntervalIndexJoinPOperator) {
@@ -426,16 +438,15 @@ public class IntervalSplitPartitioningRule implements 
IAlgebraicRewriteRule {
                 IntervalIndexJoinPOperator iijpoClone = new 
IntervalIndexJoinPOperator(iijpo.getKind(),
                         iijpo.getPartitioningType(), 
iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(),
                         memoryJoinSize, 
iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getLeftRangeId(),
-                        iijpo.getRightRangeId(), null);
+                        iijpo.getRightRangeId(), iijpo.getRangeMapHint());
                 ijoClone.setPhysicalOperator(iijpoClone);
             } else if (joinPo instanceof IntervalPartitionJoinPOperator) {
                 IntervalPartitionJoinPOperator ipjpo = 
(IntervalPartitionJoinPOperator) joinPo;
                 IntervalPartitionJoinPOperator iijpoClone = new 
IntervalPartitionJoinPOperator(ipjpo.getKind(),
                         ipjpo.getPartitioningType(), 
ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(),
-                        memoryJoinSize, ipjpo.getBuildTupleCount(), 
ipjpo.getProbeTupleCount(),
-                        ipjpo.getBuildMaxDuration(), 
ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(),
-                        ipjpo.getIntervalMergeJoinCheckerFactory(), 
ipjpo.getLeftRangeId(), ipjpo.getRightRangeId(),
-                        null);
+                        memoryJoinSize, ipjpo.getK(), 
ipjpo.getIntervalMergeJoinCheckerFactory(),
+                        ipjpo.getLeftPartitionVar(), 
ipjpo.getRightPartitionVar(), ipjpo.getLeftRangeId(),
+                        ipjpo.getRightRangeId(), ipjpo.getRangeMapHint());
                 ijoClone.setPhysicalOperator(iijpoClone);
             } else {
                 return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
index 795ee82..fb70aea 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/JoinUtils.java
@@ -18,17 +18,23 @@
  */
 package org.apache.asterix.optimizer.rules.util;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import 
org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
 import 
org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
 import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import 
org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory;
 import 
org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory;
@@ -44,23 +50,38 @@ import 
org.apache.asterix.runtime.operators.joins.OverlappingIntervalMergeJoinCh
 import 
org.apache.asterix.runtime.operators.joins.OverlapsIntervalMergeJoinCheckerFactory;
 import 
org.apache.asterix.runtime.operators.joins.StartedByIntervalMergeJoinCheckerFactory;
 import 
org.apache.asterix.runtime.operators.joins.StartsIntervalMergeJoinCheckerFactory;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
 
 public class JoinUtils {
 
+    private static final int LEFT = 0;
+    private static final int RIGHT = 1;
+
     private static final Logger LOGGER = 
Logger.getLogger(JoinUtils.class.getName());
 
     private static final Map<FunctionIdentifier, FunctionIdentifier> 
INTERVAL_JOIN_CONDITIONS = new HashMap<>();
@@ -98,8 +119,8 @@ public class JoinUtils {
         }
         List<LogicalVariable> sideLeft = new LinkedList<>();
         List<LogicalVariable> sideRight = new LinkedList<>();
-        List<LogicalVariable> varsLeft = 
op.getInputs().get(0).getValue().getSchema();
-        List<LogicalVariable> varsRight = 
op.getInputs().get(1).getValue().getSchema();
+        List<LogicalVariable> varsLeft = 
op.getInputs().get(LEFT).getValue().getSchema();
+        List<LogicalVariable> varsRight = 
op.getInputs().get(RIGHT).getValue().getSchema();
         AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) 
conditionLE;
         FunctionIdentifier fi = isIntervalJoinCondition(fexp, varsLeft, 
varsRight, sideLeft, sideRight);
         if (fi != null) {
@@ -140,43 +161,116 @@ public class JoinUtils {
 
     private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator 
op, FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IntervalJoinExpressionAnnotation ijea,
-            IOptimizationContext context) {
+            IOptimizationContext context) throws AlgebricksException {
         RangeId leftRangeId = context.newRangeId();
+        RangeId rightRangeId = context.newRangeId();
+        insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+        insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), 
context);
+
         IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, 
leftRangeId);
         op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), 
JoinPartitioningType.BROADCAST, sideLeft,
                 sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, 
leftRangeId,
-                context.newRangeId(), ijea.getRangeMap()));
+                rightRangeId, ijea.getRangeMap()));
     }
 
     private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator 
op, FunctionIdentifier fi,
-            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IntervalJoinExpressionAnnotation ijea,
-            IOptimizationContext context) {
-        long leftCount = ijea.getLeftRecordCount() > 0 ? 
ijea.getLeftRecordCount() : getCardinality(sideLeft, context);
+            List<LogicalVariable> leftKeys, List<LogicalVariable> rightKeys, 
IntervalJoinExpressionAnnotation ijea,
+            IOptimizationContext context) throws AlgebricksException {
+        long leftCount = ijea.getLeftRecordCount() > 0 ? 
ijea.getLeftRecordCount() : getCardinality(leftKeys, context);
         long rightCount = ijea.getRightRecordCount() > 0 ? 
ijea.getRightRecordCount()
-                : getCardinality(sideRight, context);
+                : getCardinality(rightKeys, context);
         long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? 
ijea.getLeftMaxDuration()
-                : getMaxDuration(sideLeft, context);
+                : getMaxDuration(leftKeys, context);
         long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? 
ijea.getRightMaxDuration()
-                : getMaxDuration(sideRight, context);
+                : getMaxDuration(rightKeys, context);
         int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? 
ijea.getTuplesPerFrame()
                 : 
context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame();
 
+        int k = IntervalPartitionUtil.determineK(leftCount, leftMaxDuration, 
rightCount, rightMaxDuration,
+                tuplesPerFrame);
+        if (k <= 2) {
+            k = 3;
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("IntervalPartitionJoin has overridden the 
suggested value of k (" + k + ") with 3.");
+            }
+        }
+
         RangeId leftRangeId = context.newRangeId();
+        RangeId rightRangeId = context.newRangeId();
+        insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+        insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), 
context);
+
+        List<LogicalVariable> leftPartitionVar = 
Arrays.asList(context.newVar(), context.newVar());
+        List<LogicalVariable> rightPartitionVar = 
Arrays.asList(context.newVar(), context.newVar());
+        insertPartitionSortKey(op, LEFT, leftPartitionVar, leftKeys.get(0), 
leftRangeId, k, context);
+        insertPartitionSortKey(op, RIGHT, rightPartitionVar, rightKeys.get(0), 
rightRangeId, k, context);
+
         IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
         op.setPhysicalOperator(new 
IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
-                sideLeft, sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), leftCount,
-                rightCount, leftMaxDuration, rightMaxDuration, tuplesPerFrame, 
mjcf, leftRangeId, context.newRangeId(),
-                ijea.getRangeMap()));
+                leftKeys, rightKeys, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), k, mjcf,
+                leftPartitionVar, rightPartitionVar, leftRangeId, 
rightRangeId, ijea.getRangeMap()));
+    }
+
+    private static void insertRangeForward(AbstractBinaryJoinOperator op, int 
branch, RangeId rangeId,
+            IRangeMap rangeMap, IOptimizationContext context) throws 
AlgebricksException {
+        RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap);
+        rfo.setExecutionMode(op.getExecutionMode());
+        rfo.getInputs().add(op.getInputs().get(branch));
+        RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, 
rangeMap);
+        rfo.setPhysicalOperator(rfpo);
+        Mutable<ILogicalOperator> rfoRef = new MutableObject<>(rfo);
+        op.getInputs().set(branch, rfoRef);
+
+        context.computeAndSetTypeEnvironmentForOperator(rfo);
+    }
+
+    private static void insertPartitionSortKey(AbstractBinaryJoinOperator op, 
int branch,
+            List<LogicalVariable> partitionVars, LogicalVariable intervalVar, 
RangeId rangeId, int k,
+            IOptimizationContext context) throws AlgebricksException {
+        MutableObject<ILogicalExpression> intervalExp = new MutableObject<>(
+                new VariableReferenceExpression(intervalVar));
+        MutableObject<ILogicalExpression> rangeIdConstant = new 
MutableObject<>(
+                new ConstantExpression(new AsterixConstantValue(new 
AInt32(rangeId.getId()))));
+        MutableObject<ILogicalExpression> kConstant = new MutableObject<>(
+                new ConstantExpression(new AsterixConstantValue(new 
AInt32(k))));
+
+        List<Mutable<ILogicalExpression>> assignExps = new ArrayList<>();
+        // Start partition
+        IFunctionInfo startFi = 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_START);
+        @SuppressWarnings("unchecked")
+        ScalarFunctionCallExpression startPartitionExp = new 
ScalarFunctionCallExpression(startFi, intervalExp,
+                rangeIdConstant, kConstant);
+        assignExps.add(new 
MutableObject<ILogicalExpression>(startPartitionExp));
+        // End partition
+        IFunctionInfo endFi = 
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_END);
+        @SuppressWarnings("unchecked")
+        ScalarFunctionCallExpression endPartitionExp = new 
ScalarFunctionCallExpression(endFi, intervalExp,
+                rangeIdConstant, kConstant);
+        assignExps.add(new MutableObject<ILogicalExpression>(endPartitionExp));
+
+        AssignOperator ao = new AssignOperator(partitionVars, assignExps);
+        ao.setExecutionMode(op.getExecutionMode());
+        AssignPOperator apo = new AssignPOperator();
+        ao.setPhysicalOperator(apo);
+        Mutable<ILogicalOperator> aoRef = new MutableObject<>(ao);
+        ao.getInputs().add(op.getInputs().get(branch));
+        op.getInputs().set(branch, aoRef);
+
+        context.computeAndSetTypeEnvironmentForOperator(ao);
     }
 
     private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, 
FunctionIdentifier fi,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, 
IntervalJoinExpressionAnnotation ijea,
-            IOptimizationContext context) {
+            IOptimizationContext context) throws AlgebricksException {
         RangeId leftRangeId = context.newRangeId();
+        RangeId rightRangeId = context.newRangeId();
+        insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
+        insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), 
context);
+
         IIntervalMergeJoinCheckerFactory mjcf = 
getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
         op.setPhysicalOperator(new 
IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
                 sideLeft, sideRight, 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, 
leftRangeId,
-                context.newRangeId(), ijea.getRangeMap()));
+                rightRangeId, ijea.getRangeMap()));
     }
 
     private static int getMaxDuration(List<LogicalVariable> lv, 
IOptimizationContext context) {
@@ -202,8 +296,8 @@ public class JoinUtils {
             } else {
                 return null;
             }
-            ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
-            ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+            ILogicalExpression opLeft = 
fexp.getArguments().get(LEFT).getValue();
+            ILogicalExpression opRight = 
fexp.getArguments().get(RIGHT).getValue();
             if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
                     || opRight.getExpressionTag() != 
LogicalExpressionTag.VARIABLE) {
                 return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 69b9853..71719d5 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -261,6 +261,8 @@ import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalMetByDes
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlappedByDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlappingDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalOverlapsDescriptor;
+import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalPartitionJoinEndDescriptor;
+import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalPartitionJoinStartDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalStartedByDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.IntervalStartsDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.temporal.MillisecondsFromDayTimeDurationDescriptor;
@@ -401,6 +403,10 @@ public class FunctionCollection {
         temp.add(SimilarityJaccardPrefixDescriptor.FACTORY);
         temp.add(SimilarityJaccardPrefixCheckDescriptor.FACTORY);
 
+        // Partition functions for interval partition join pre-sorting
+        temp.add(IntervalPartitionJoinStartDescriptor.FACTORY);
+        temp.add(IntervalPartitionJoinEndDescriptor.FACTORY);
+
         // functions that need generated class for null-handling.
         List<IFunctionDescriptorFactory> functionsToInjectUnkownHandling = new 
ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 7309f0c..f4342ca 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -79,7 +79,7 @@ public class AsterixPropertiesAccessor {
                 fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
                 is = new FileInputStream(fileName);
             } catch (FileNotFoundException fnf) {
-                throw new AsterixException("Could not find configuration file 
" + fileName);
+                throw new AsterixException("Could not find configuration file 
" + fileName, fnf);
             }
         }
 
@@ -90,7 +90,7 @@ public class AsterixPropertiesAccessor {
             Unmarshaller unmarshaller = ctx.createUnmarshaller();
             asterixConfiguration = (AsterixConfiguration) 
unmarshaller.unmarshal(is);
         } catch (JAXBException e) {
-            throw new AsterixException("Failed to read configuration file " + 
fileName);
+            throw new AsterixException("Failed to read configuration file " + 
fileName, e);
         }
         instanceName = asterixConfiguration.getInstanceName();
         metadataNodeName = asterixConfiguration.getMetadataNode();
@@ -117,7 +117,7 @@ public class AsterixPropertiesAccessor {
             nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
         }
-        asterixConfigurationParams = new HashMap<String, Property>();
+        asterixConfigurationParams = new HashMap<>();
         for (Property p : asterixConfiguration.getProperty()) {
             asterixConfigurationParams.put(p.getName(), p);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
index e5950e3..2d159a6 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -525,6 +525,10 @@ public class AsterixBuiltinFunctions {
             "interval-ends", 2);
     public static final FunctionIdentifier INTERVAL_ENDED_BY = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "interval-ended-by", 2);
+    public static final FunctionIdentifier INTERVAL_PARTITION_JOIN_START = new 
FunctionIdentifier(
+            FunctionConstants.ASTERIX_NS, "interval-partition-join-start", 3);
+    public static final FunctionIdentifier INTERVAL_PARTITION_JOIN_END = new 
FunctionIdentifier(
+            FunctionConstants.ASTERIX_NS, "interval-partition-join-end", 3);
     public static final FunctionIdentifier CURRENT_TIME = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "current-time", 0);
     public static final FunctionIdentifier CURRENT_DATE = new 
FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1009,6 +1013,8 @@ public class AsterixBuiltinFunctions {
         addFunction(INTERVAL_COVERED_BY, ABooleanTypeComputer.INSTANCE, true);
         addFunction(INTERVAL_ENDS, ABooleanTypeComputer.INSTANCE, true);
         addFunction(INTERVAL_ENDED_BY, ABooleanTypeComputer.INSTANCE, true);
+        addPrivateFunction(INTERVAL_PARTITION_JOIN_START, 
AInt32TypeComputer.INSTANCE, true);
+        addPrivateFunction(INTERVAL_PARTITION_JOIN_END, 
AInt32TypeComputer.INSTANCE, true);
         addFunction(CURRENT_DATE, ADateTypeComputer.INSTANCE, false);
         addFunction(CURRENT_TIME, ATimeTypeComputer.INSTANCE, false);
         addFunction(CURRENT_DATETIME, ADateTimeTypeComputer.INSTANCE, false);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
index 1bbd745..034cfeb 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
@@ -35,12 +35,12 @@ public class FunctionManagerImpl implements 
IFunctionManager {
     private final Map<Pair<FunctionIdentifier, Integer>, 
IFunctionDescriptorFactory> functions;
 
     public FunctionManagerImpl() {
-        functions = new HashMap<Pair<FunctionIdentifier, Integer>, 
IFunctionDescriptorFactory>();
+        functions = new HashMap<>();
     }
 
     @Override
     public synchronized IFunctionDescriptor lookupFunction(FunctionIdentifier 
fid) throws AlgebricksException {
-        Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, 
Integer>(fid, fid.getArity());
+        Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, 
fid.getArity());
         IFunctionDescriptorFactory factory = functions.get(key);
         if (factory == null) {
             throw new AlgebricksException("Inappropriate use of function " + 
"'" + fid.getName() + "'");
@@ -58,7 +58,7 @@ public class FunctionManagerImpl implements IFunctionManager {
     public synchronized void unregisterFunction(IFunctionDescriptorFactory 
descriptorFactory)
             throws AlgebricksException {
         FunctionIdentifier fid = 
descriptorFactory.createFunctionDescriptor().getIdentifier();
-        Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, 
Integer>(fid, fid.getArity());
+        Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, 
fid.getArity());
         functions.remove(key);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
index c42865c..b053d99 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
@@ -63,9 +63,9 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
  * <p/>
  */
 public class CalendarDurationFromDateTimeDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
-    private final static long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = 
AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
-    public final static IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+    private static final long serialVersionUID = 1L;
+    public static final FunctionIdentifier FID = 
AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
 
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
new file mode 100644
index 0000000..8d737f4
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinEndDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class IntervalPartitionJoinEndDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new IntervalPartitionJoinEndDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final 
IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final 
IHyracksTaskContext ctx) throws AlgebricksException {
+                return new IntervalPartitionJoinFunction(args, ctx, false);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_END;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
new file mode 100644
index 0000000..4725fa5
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinFunction.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import java.io.DataOutput;
+
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import 
org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import 
org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
+
+public class IntervalPartitionJoinFunction implements IScalarEvaluator {
+
+    private ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
+    private DataOutput out = resultStorage.getDataOutput();
+    private IPointable argPtr0 = new VoidPointable();
+    private IPointable argPtr1 = new VoidPointable();
+    private IPointable argPtr2 = new VoidPointable();
+    private int rangeIdCache = -1;
+    private long partitionStart;
+    private long partitionDuration;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<AInt32> intSerde = 
AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(BuiltinType.AINT32);
+    private AMutableInt32 aInt = new AMutableInt32(0);
+
+    private IHyracksTaskContext ctx;
+    private IScalarEvaluator eval0;
+    private IScalarEvaluator eval1;
+    private IScalarEvaluator eval2;
+    private boolean startPoint;
+
+    public IntervalPartitionJoinFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext ctx, boolean startPoint)
+            throws AlgebricksException {
+        this.ctx = ctx;
+        this.eval0 = args[0].createScalarEvaluator(ctx);
+        this.eval1 = args[1].createScalarEvaluator(ctx);
+        this.eval2 = args[2].createScalarEvaluator(ctx);
+        this.startPoint = startPoint;
+    }
+
+    public void evaluate(IFrameTupleReference tuple, IPointable result) throws 
AlgebricksException {
+        resultStorage.reset();
+        // Interval
+        eval0.evaluate(tuple, argPtr0);
+        // rangeId
+        eval1.evaluate(tuple, argPtr1);
+        // k
+        eval2.evaluate(tuple, argPtr2);
+
+        byte[] bytes0 = argPtr0.getByteArray();
+        int offset0 = argPtr0.getStartOffset();
+        byte[] bytes1 = argPtr1.getByteArray();
+        int offset1 = argPtr1.getStartOffset();
+        byte[] bytes2 = argPtr2.getByteArray();
+        int offset2 = argPtr2.getStartOffset();
+
+        try {
+            if (bytes0[offset0] != ATypeTag.SERIALIZED_INTERVAL_TYPE_TAG) {
+                throw new AlgebricksException("Expected type INTERVAL for 
parameter 0 but got "
+                        + 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes0[offset0]));
+            }
+
+            if (bytes1[offset1] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
+                throw new AlgebricksException("Expected type INT32 for 
parameter 1 but got "
+                        + 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes1[offset1]));
+            }
+
+            if (bytes2[offset2] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
+                throw new AlgebricksException("Expected type INT32 for 
parameter 2 but got "
+                        + 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes2[offset2]));
+            }
+
+            long point;
+            if (startPoint) {
+                point = 
AIntervalSerializerDeserializer.getIntervalStart(bytes0, offset0 + 1);
+            } else {
+                point = AIntervalSerializerDeserializer.getIntervalEnd(bytes0, 
offset0 + 1);
+            }
+            int rangeId = AInt32SerializerDeserializer.getInt(bytes1, offset1 
+ 1);
+            int k = AInt32SerializerDeserializer.getInt(bytes2, offset2 + 1);
+
+            if (rangeId != rangeIdCache) {
+                // Only load new values if the range changed.
+                RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId, ctx);
+                IRangeMap rangeMap = rangeState.getRangeMap();
+                partitionStart = 
LongPointable.getLong(rangeMap.getMinByteArray(0), 
rangeMap.getMinStartOffset(0) + 1);
+                long partitionEnd = 
LongPointable.getLong(rangeMap.getMaxByteArray(0),
+                        rangeMap.getMaxStartOffset(0) + 1);
+                partitionDuration = 
IntervalPartitionUtil.getPartitionDuration(partitionStart, partitionEnd, k);
+                rangeIdCache = rangeId;
+            }
+
+            int partition = IntervalPartitionUtil.getIntervalPartition(point, 
partitionStart, partitionDuration, k);
+            aInt.setValue(partition);
+            intSerde.serialize(aInt, out);
+        } catch (HyracksDataException hex) {
+            throw new AlgebricksException(hex);
+        }
+        result.set(resultStorage);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
new file mode 100644
index 0000000..cc1d2f1
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/IntervalPartitionJoinStartDescriptor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.temporal;
+
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class IntervalPartitionJoinStartDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new IntervalPartitionJoinStartDescriptor();
+        }
+    };
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final 
IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final 
IHyracksTaskContext ctx) throws AlgebricksException {
+                return new IntervalPartitionJoinFunction(args, ctx, true);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_START;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
index a5f7770..880e181 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/OverlappingIntervalMergeJoinCheckerFactory.java
@@ -39,8 +39,7 @@ public class OverlappingIntervalMergeJoinCheckerFactory 
extends AbstractInterval
     public IIntervalMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] 
keys1, int partition,
             IHyracksTaskContext ctx) throws HyracksDataException {
         int fieldIndex = 0;
-        RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx
-                .getStateObject(new RangeId(rangeId.getId(), ctx));
+        RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
         IRangeMap rangeMap = rangeState.getRangeMap();
         if (ATypeTag.INT64.serialize() != rangeMap.getTag(0, 0)) {
             throw new HyracksDataException("Invalid range map type for 
interval merge join checker.");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
index d09a941..38ab073 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionComputerFactory.java
@@ -18,9 +18,6 @@
  */
 package org.apache.asterix.runtime.operators.joins.intervalpartition;
 
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -34,24 +31,12 @@ public class IntervalPartitionComputerFactory implements 
ITuplePartitionComputer
     private final long partitionStart;
     private final long partitionDuration;
 
-    private static final Logger LOGGER = 
Logger.getLogger(IntervalPartitionComputerFactory.class.getName());
-
     public IntervalPartitionComputerFactory(int intervalFieldId, int k, long 
partitionStart, long partitionEnd)
             throws HyracksDataException {
         this.intervalFieldId = intervalFieldId;
         this.k = k;
         this.partitionStart = partitionStart;
-        if (k <= 2) {
-            throw new HyracksDataException("k is to small for interval 
partitioner.");
-        }
-        long duration = (partitionEnd - partitionStart) / (k - 2);
-        if (duration <= 0) {
-            duration = 1;
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.fine("The interval partitioner using the smallest 
duration (1).");
-            }
-        }
-        partitionDuration = duration;
+        this.partitionDuration = 
IntervalPartitionUtil.getPartitionDuration(partitionStart, partitionEnd, k);
     }
 
     @Override
@@ -66,12 +51,7 @@ public class IntervalPartitionComputerFactory implements 
ITuplePartitionComputer
             }
 
             private int getIntervalPartition(long point) throws 
HyracksDataException {
-                if (point < partitionStart) {
-                    return 0;
-                }
-                long pointFloor = Math.floorDiv(point - partitionStart, 
partitionDuration);
-                // Add one to the partition, since 0 represents any point 
before the start partition point.
-                return (int) Math.min(pointFloor + 1, k - 1L);
+                return IntervalPartitionUtil.getIntervalPartition(point, 
partitionStart, partitionDuration, k);
             }
 
             public int getIntervalPartitionI(IFrameTupleAccessor accessor, int 
tIndex, int fieldId)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 4e1850c..6ea1e6f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -58,11 +58,8 @@ public class IntervalPartitionJoinOperatorDescriptor extends 
AbstractOperatorDes
     private final int[] probeKeys;
     private final int[] buildKeys;
 
-    private final long probeTupleCount;
-    private final long probeMaxDuration;
-    private final long buildTupleCount;
-    private final long buildMaxDuration;
-    private final int avgTuplesPerFrame;
+    private final int k;
+
     private final int probeKey;
     private final int buildKey;
     private final IIntervalMergeJoinCheckerFactory imjcf;
@@ -70,19 +67,14 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
 
     private static final Logger LOGGER = 
Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
 
-    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int memsize, long leftTupleCount,
-            long rightTupleCount, long leftMaxDuration, long rightMaxDuration, 
int avgTuplesPerFrame, int[] leftKeys,
+    public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int memsize, int k, int[] leftKeys,
             int[] rightKeys, RecordDescriptor recordDescriptor, 
IIntervalMergeJoinCheckerFactory imjcf,
             RangeId rangeId) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.buildKey = leftKeys[0];
         this.probeKey = rightKeys[0];
-        this.buildTupleCount = leftTupleCount;
-        this.probeTupleCount = rightTupleCount;
-        this.buildMaxDuration = leftMaxDuration;
-        this.probeMaxDuration = rightMaxDuration;
-        this.avgTuplesPerFrame = avgTuplesPerFrame;
+        this.k = k;
         this.buildKeys = leftKeys;
         this.probeKeys = rightKeys;
         recordDescriptors[0] = recordDescriptor;
@@ -136,8 +128,6 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
 
             final RecordDescriptor buildRd = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor probeRd = 
recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-            final int k = IntervalPartitionUtil.determineK(buildTupleCount, 
buildMaxDuration, probeTupleCount,
-                    probeMaxDuration, avgTuplesPerFrame);
 
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new 
BuildAndPartitionTaskState(
@@ -152,16 +142,8 @@ public class IntervalPartitionJoinOperatorDescriptor 
extends AbstractOperatorDes
                         throw new HyracksDataException("not enough memory for 
join");
                     }
                     state.k = k;
-                    if (k <= 2) {
-                        state.k = 3;
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("IntervalPartitionJoin has 
overridden the suggested value of k (" + state.k
-                                    + ") with 3.");
-                        }
-                    }
 
-                    RangeForwardTaskState rangeState = (RangeForwardTaskState) 
ctx
-                            .getStateObject(new RangeId(rangeId.getId(), ctx));
+                    RangeForwardTaskState rangeState = 
RangeForwardTaskState.getRangeState(rangeId.getId(), ctx);
                     long partitionStart = 
IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(),
                             partition);
                     long partitionEnd = 
IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 415feae..453287d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 
 public class IntervalPartitionUtil {
@@ -232,4 +233,25 @@ public class IntervalPartitionUtil {
         return inMemoryMap;
     }
 
+    public static long getPartitionDuration(long partitionStart, long 
partitionEnd, int k) throws HyracksDataException {
+        if (k <= 2) {
+            throw new HyracksDataException("k is to small for interval 
partitioner.");
+        }
+        long duration = (partitionEnd - partitionStart) / (k - 2);
+        if (duration <= 0) {
+            duration = 1;
+        }
+        return duration;
+    }
+
+    public static int getIntervalPartition(long point, long partitionStart, 
long partitionDuration, int k)
+            throws HyracksDataException {
+        if (point < partitionStart) {
+            return 0;
+        }
+        long pointFloor = Math.floorDiv(point - partitionStart, 
partitionDuration);
+        // Add one to the partition, since 0 represents any point before the 
start partition point.
+        return (int) Math.min(pointFloor + 1, k - 1L);
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index 2f2b7e3..fc43ecb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -27,7 +27,7 @@ public final class FunctionIdentifier implements Serializable 
{
     private final String name;
     private final int arity;
 
-    public final static int VARARGS = -1;
+    public static final int VARARGS = -1;
 
     public FunctionIdentifier(String namespace, String name) {
         this(namespace, name, VARARGS);
@@ -60,6 +60,7 @@ public final class FunctionIdentifier implements Serializable 
{
         return name.hashCode() + namespace.hashCode();
     }
 
+    @Override
     public String toString() {
         return getNamespace() + ":" + name;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 873b847..93d878c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -290,6 +290,7 @@ public class SchemaVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void
 
     @Override
     public Void visitRangeForwardOperator(RangeForwardOperator op, Void arg) 
throws AlgebricksException {
+        standardLayout(op);
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f3adffd..5735e9a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -585,8 +585,7 @@ public class EnforceStructuralPropertiesRule implements 
IAlgebraicRewriteRule {
                     List<ILocalStructuralProperty> cldLocals = 
deliveredByChild.getLocalProperties();
                     List<ILocalStructuralProperty> reqdLocals = 
required.getLocalProperties();
 
-                    // Add RangeForwardOperator.
-                    addRangeForwardOperator(op.getInputs().get(i), 
opp.getRangeId(), opp.getRangeMapHint(), context);
+                    // The RangeForwardOperator should already be in the plan.
 
                     boolean propWasSet = false;
                     pop = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/1487f2be/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index 6bc129e..c670b6b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -48,10 +48,10 @@ import 
org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
  */
 public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
 
-    private final Set<LogicalVariable> usedVars = new 
HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> liveVars = new 
HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> producedVars = new 
HashSet<LogicalVariable>();
-    private final List<LogicalVariable> projectVars = new 
ArrayList<LogicalVariable>();
+    private final Set<LogicalVariable> usedVars = new HashSet<>();
+    private final Set<LogicalVariable> liveVars = new HashSet<>();
+    private final Set<LogicalVariable> producedVars = new HashSet<>();
+    private final List<LogicalVariable> projectVars = new ArrayList<>();
     protected boolean hasRun = false;
 
     @Override
@@ -78,7 +78,7 @@ public class IntroduceProjectsRule implements 
IAlgebraicRewriteRule {
         VariableUtilities.getUsedVariables(op, usedVars);
 
         // In the top-down pass, maintain a set of variables that are used in 
op and all its parents.
-        HashSet<LogicalVariable> parentsUsedVars = new 
HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> parentsUsedVars = new HashSet<>();
         parentsUsedVars.addAll(parentUsedVars);
         parentsUsedVars.addAll(usedVars);
 
@@ -115,7 +115,7 @@ public class IntroduceProjectsRule implements 
IAlgebraicRewriteRule {
                 ILogicalOperator childOp = op.getInputs().get(i).getValue();
                 liveVars.clear();
                 VariableUtilities.getLiveVariables(childOp, liveVars);
-                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                List<LogicalVariable> vars = new ArrayList<>();
                 vars.addAll(projectVars);
                 // Only retain those variables that are live in the i-th input 
branch.
                 vars.retainAll(liveVars);
@@ -132,8 +132,8 @@ public class IntroduceProjectsRule implements 
IAlgebraicRewriteRule {
             liveVars.clear();
             
VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
             ProjectOperator projectOp = (ProjectOperator) op;
-            List<LogicalVariable> projectVars = projectOp.getVariables();
-            if (liveVars.size() == projectVars.size() && 
liveVars.containsAll(projectVars)) {
+            List<LogicalVariable> projectVarsTemp = projectOp.getVariables();
+            if (liveVars.size() == projectVarsTemp.size() && 
liveVars.containsAll(projectVarsTemp)) {
                 boolean eliminateProject = true;
                 // For UnionAll the variables must also be in exactly the 
correct order.
                 if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
@@ -155,7 +155,7 @@ public class IntroduceProjectsRule implements 
IAlgebraicRewriteRule {
 
     private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, 
ProjectOperator projectOp,
             int unionInputIndex) throws AlgebricksException {
-        List<LogicalVariable> orderedLiveVars = new 
ArrayList<LogicalVariable>();
+        List<LogicalVariable> orderedLiveVars = new ArrayList<>();
         
VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), 
orderedLiveVars);
         int numVars = orderedLiveVars.size();
         for (int i = 0; i < numVars; i++) {

Reply via email to