snapshot super interval dag.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/aea7fe87
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/aea7fe87
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/aea7fe87

Branch: refs/heads/ecarm002/interval_join_merge
Commit: aea7fe87d3c2d03902a9e207e1afa457733e5171
Parents: 199bddd
Author: Preston Carman <prest...@apache.org>
Authored: Thu Jun 30 16:03:28 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Thu Jun 30 16:03:28 2016 -0700

----------------------------------------------------------------------
 .../IntervalLocalRangeSplitterOperator.java     |  69 +++
 .../physical/AbstractIntervalJoinPOperator.java |  17 +
 .../physical/IntervalIndexJoinPOperator.java    |   1 -
 .../physical/IntervalJoinPOperator.java         |  63 +++
 .../IntervalLocalRangeOperatorDescriptor.java   | 197 ++++++++
 .../IntervalLocalRangeSplitterPOperator.java    | 101 +++++
 .../IntervalPartitionJoinPOperator.java         |  20 +
 .../asterix/optimizer/base/RuleCollections.java |   2 +
 .../rules/IntervalSplitPartitioningRule.java    | 445 +++++++++++++++++++
 ...IntervalPartitionJoinOperatorDescriptor.java |   3 -
 .../logical/AbstractLogicalOperator.java        |  16 +-
 .../operators/logical/ReplicateOperator.java    |  12 +-
 .../operators/physical/MergeJoinPOperator.java  |  16 +
 .../operators/physical/NLJoinPOperator.java     | 295 ------------
 .../physical/NestedLoopJoinPOperator.java       | 295 ++++++++++++
 .../RangePartitionExchangePOperator.java        |   8 +
 .../RangePartitionMergeExchangePOperator.java   |   8 +
 .../operators/physical/ReplicatePOperator.java  |   2 +-
 .../operators/physical/UnionAllPOperator.java   |   3 -
 .../typing/PropagatingTypeEnvironment.java      |   4 +-
 .../algebricks/rewriter/util/JoinUtils.java     |   6 +-
 .../std/misc/SplitOperatorDescriptor.java       |   6 +-
 22 files changed, 1259 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
new file mode 100644
index 0000000..9ae9f7d
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/IntervalLocalRangeSplitterOperator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.algebra.operators;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+public class IntervalLocalRangeSplitterOperator extends 
AbstractExtensibleLogicalOperator {
+
+    private final List<LogicalVariable> joinKeyLogicalVars;
+
+    public IntervalLocalRangeSplitterOperator(List<LogicalVariable> 
joinKeyLogicalVars) {
+        this.joinKeyLogicalVars = joinKeyLogicalVars;
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public IOperatorExtension newInstance() {
+        return new IntervalLocalRangeSplitterOperator(joinKeyLogicalVars);
+    }
+
+    @Override
+    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "IntervalLocalRangeSplitterOperator";
+    }
+
+    @Override
+    public void getUsedVariables(Collection<LogicalVariable> usedVars) {
+        usedVars.addAll(joinKeyLogicalVars);
+    }
+
+    @Override
+    public void getProducedVariables(Collection<LogicalVariable> producedVars) 
{
+        // No produced variables.
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
index 146acd5..ca50f1b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AbstractIntervalJoinPOperator.java
@@ -67,6 +67,22 @@ public abstract class AbstractIntervalJoinPOperator extends 
AbstractJoinPOperato
         this.rangeMap = rangeMap;
     }
 
+    public List<LogicalVariable> getKeysLeftBranch() {
+        return keysLeftBranch;
+    }
+
+    public List<LogicalVariable> getKeysRightBranch() {
+        return keysRightBranch;
+    }
+
+    public IIntervalMergeJoinCheckerFactory 
getIntervalMergeJoinCheckerFactory() {
+        return mjcf;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.EXTENSION_OPERATOR;
@@ -154,4 +170,5 @@ public abstract class AbstractIntervalJoinPOperator extends 
AbstractJoinPOperato
     abstract IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, 
int[] keysRight,
             IOperatorDescriptorRegistry spec, RecordDescriptor 
recordDescriptor, IIntervalMergeJoinCheckerFactory mjcf,
             IRangeMap rangeMap);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java
index 97aaafc..32a0f56 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalIndexJoinPOperator.java
@@ -59,5 +59,4 @@ public class IntervalIndexJoinPOperator extends 
AbstractIntervalJoinPOperator {
         return new IntervalIndexJoinOperatorDescriptor(spec, memSizeInFrames, 
keysLeft, keysRight, recordDescriptor,
                 mjcf);
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java
new file mode 100644
index 0000000..528822d
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalJoinPOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.operators.physical;
+
+import java.util.List;
+import java.util.logging.Logger;
+
+import 
org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
+import 
org.apache.asterix.runtime.operators.joins.intervalindex.IntervalIndexJoinOperatorDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class IntervalJoinPOperator extends AbstractIntervalJoinPOperator {
+
+    private final int memSizeInFrames;
+
+    private static final Logger LOGGER = 
Logger.getLogger(IntervalJoinPOperator.class.getName());
+
+    public IntervalJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> 
sideRightOfEqualities,
+            int memSizeInFrames, IIntervalMergeJoinCheckerFactory mjcf, 
IRangeMap rangeMap) {
+        super(kind, partitioningType, sideLeftOfEqualities, 
sideRightOfEqualities, mjcf, rangeMap);
+        this.memSizeInFrames = memSizeInFrames;
+
+        LOGGER.fine("IntervalJoinPOperator constructed with: JoinKind=" + kind 
+ ", JoinPartitioningType="
+                + partitioningType + ", List<LogicalVariable>=" + 
sideLeftOfEqualities + ", List<LogicalVariable>="
+                + sideRightOfEqualities + ", int memSizeInFrames=" + 
memSizeInFrames
+                + ", IMergeJoinCheckerFactory mjcf=" + mjcf + ", IRangeMap 
rangeMap=" + rangeMap + ".");
+    }
+
+    @Override
+    public String getIntervalJoin() {
+        return "INTERVAL_JOIN";
+    }
+
+    @Override
+    IOperatorDescriptor getIntervalOperatorDescriptor(int[] keysLeft, int[] 
keysRight, IOperatorDescriptorRegistry spec,
+            RecordDescriptor recordDescriptor, 
IIntervalMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
+        return new IntervalIndexJoinOperatorDescriptor(spec, memSizeInFrames, 
keysLeft, keysRight, recordDescriptor,
+                mjcf);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
new file mode 100644
index 0000000..392bf43
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeOperatorDescriptor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.operators.physical;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.runtime.operators.joins.IntervalJoinUtil;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class IntervalLocalRangeOperatorDescriptor extends 
AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private static final int PARTITION_ACTIVITY_ID = 0;
+
+    private static final int INPUT_STARTS = 0;
+    private static final int INPUT_COVERS = 2;
+    private static final int INPUT_ENDS = 1;
+
+//    private static final int INPUT_STARTS = 0;
+//    private static final int INPUT_COVERS = 0;
+//    private static final int INPUT_ENDS = 0;
+
+    private final int key;
+    private final IRangeMap rangeMap;
+
+    public IntervalLocalRangeOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int[] keys,
+            RecordDescriptor recordDescriptor, IRangeMap rangeMap) {
+        super(spec, 1, 3);
+        key = keys[0];
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ActivityId aid = new ActivityId(odId, PARTITION_ACTIVITY_ID);
+        IActivity phase = new PartitionActivityNode(aid);
+
+        builder.addActivity(this, phase);
+        builder.addSourceEdge(0, phase, 0);
+        // Connect output
+        builder.addTargetEdge(0, phase, 0);
+        builder.addTargetEdge(1, phase, 1);
+        builder.addTargetEdge(2, phase, 2);
+    }
+
+    private final class PartitionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public PartitionActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions) {
+            return new AbstractUnaryInputOperatorNodePushable() {
+                private final IFrameWriter[] writers = new 
IFrameWriter[getOutputArity()];
+                private final FrameTupleAppender[] resultAppender = new 
FrameTupleAppender[getOutputArity()];
+                private final RecordDescriptor rd = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+                private final FrameTupleAccessor accessor = new 
FrameTupleAccessor(rd);
+                private final long nodeRangeStart = getPartitionBoundryStart();
+                private final long nodeRangeEnd = getPartitionBoundryEnd();
+
+                @Override
+                public void close() throws HyracksDataException {
+                    flush();
+                    for (int i = 0; i < getOutputArity(); i++) {
+                        writers[i].close();
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < getOutputArity(); i++) {
+                        resultAppender[i].flush(writers[i]);
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    for (int i = 0; i < getOutputArity(); i++) {
+                        writers[i].fail();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                    accessor.reset(buffer);
+                    int tupleCount = accessor.getTupleCount();
+                    for (int i = 0; i < tupleCount; i++) {
+                        int pid = localPartition(accessor, i, key);
+                        if (pid < outputArity) {
+                            FrameUtils.appendToWriter(writers[pid], 
resultAppender[pid], accessor, i);
+                        }
+                    }
+                }
+
+                private int localPartition(FrameTupleAccessor accessor, int i, 
int key) {
+                    long start = IntervalJoinUtil.getIntervalStart(accessor, 
i, key);
+                    if (start < nodeRangeStart) {
+                        long end = IntervalJoinUtil.getIntervalEnd(accessor, 
i, key);
+                        if (end < nodeRangeEnd) {
+                            // Ends
+                            return INPUT_ENDS;
+                        } else {
+                            // Covers (match will all intervals)
+                            return INPUT_COVERS;
+                        }
+                    } else {
+                        // Start (responsible for matches)
+                        return INPUT_STARTS;
+                    }
+                }
+
+                @Override
+                public void open() throws HyracksDataException {
+                    for (int i = 0; i < getOutputArity(); i++) {
+                        writers[i].open();
+                        resultAppender[i] = new FrameTupleAppender(new 
VSizeFrame(ctx), true);
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter 
writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+
+                long getPartitionBoundryStart() {
+                    int fieldIndex = 0;
+                    int slot = partition - 1;
+                    long boundary = Long.MIN_VALUE;
+                    // All lookups are on typed values.
+                    if (partition == 0) {
+                        boundary = 
LongPointable.getLong(rangeMap.getMinByteArray(fieldIndex),
+                                rangeMap.getMinStartOffset(fieldIndex) + 1);
+                    } else if (partition <= rangeMap.getSplitCount()) {
+                        boundary = 
LongPointable.getLong(rangeMap.getByteArray(fieldIndex, slot),
+                                rangeMap.getStartOffset(fieldIndex, slot) + 1);
+                    } else if (partition > rangeMap.getSplitCount()) {
+                        boundary = 
LongPointable.getLong(rangeMap.getMaxByteArray(fieldIndex),
+                                rangeMap.getMaxStartOffset(fieldIndex) + 1);
+                    }
+                    return boundary;
+                }
+
+                long getPartitionBoundryEnd() {
+                    int fieldIndex = 0;
+                    int slot = partition;
+                    long boundary = Long.MAX_VALUE;
+                    // All lookups are on typed values.
+                    if (partition < rangeMap.getSplitCount()) {
+                        boundary = 
LongPointable.getLong(rangeMap.getByteArray(fieldIndex, slot),
+                                rangeMap.getStartOffset(fieldIndex, slot) + 1);
+                    } else if (partition == rangeMap.getSplitCount()) {
+                        boundary = 
LongPointable.getLong(rangeMap.getMaxByteArray(fieldIndex),
+                                rangeMap.getMaxStartOffset(fieldIndex) + 1);
+                    }
+                    return boundary;
+                }
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
new file mode 100644
index 0000000..1150b91
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalLocalRangeSplitterPOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class IntervalLocalRangeSplitterPOperator extends 
AbstractPhysicalOperator {
+
+    private List<LogicalVariable> intervalFields;
+    private IRangeMap rangeMap;
+
+    public IntervalLocalRangeSplitterPOperator(List<LogicalVariable> 
intervalFields, IRangeMap rangeMap) {
+        this.intervalFields = intervalFields;
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.EXTENSION_OPERATOR;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) 
op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public String toString() {
+        return "IntervalLocalRangeSplitterPOperator " + intervalFields;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        int[] keys = JobGenHelper.variablesToFieldIndexes(intervalFields, 
inputSchemas[0]);
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recordDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+
+        IOperatorDescriptor opDesc = new 
IntervalLocalRangeOperatorDescriptor(spec, keys, recordDescriptor, rangeMap);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+        // and contribute one edge from its child
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
index c52a9cd..414d0b4 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IntervalPartitionJoinPOperator.java
@@ -62,6 +62,26 @@ public class IntervalPartitionJoinPOperator extends 
AbstractIntervalJoinPOperato
                 + ".");
     }
 
+    public int getProbeTupleCount() {
+        return probeTupleCount;
+    }
+
+    public int getProbeMaxDuration() {
+        return probeMaxDuration;
+    }
+
+    public int getBuildTupleCount() {
+        return buildTupleCount;
+    }
+
+    public int getBuildMaxDuration() {
+        return buildMaxDuration;
+    }
+
+    public int getAvgTuplesInFrame() {
+        return avgTuplesInFrame;
+    }
+
     @Override
     public String getIntervalJoin() {
         return "INTERVAL_PARTITION_JOIN";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index a2f8430..5fcfc94 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -39,6 +39,7 @@ import 
org.apache.asterix.optimizer.rules.FeedScanCollectionToUnnest;
 import org.apache.asterix.optimizer.rules.FuzzyEqRule;
 import org.apache.asterix.optimizer.rules.IfElseToSwitchCaseFunctionRule;
 import org.apache.asterix.optimizer.rules.InlineUnnestFunctionRule;
+import org.apache.asterix.optimizer.rules.IntervalSplitPartitioningRule;
 import org.apache.asterix.optimizer.rules.IntroduceAutogenerateIDRule;
 import 
org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule;
 import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
@@ -332,6 +333,7 @@ public final class RuleCollections {
         prepareForJobGenRewrites
                 .add(new 
IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
         prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
+        prepareForJobGenRewrites.add(new IntervalSplitPartitioningRule());
         // Re-infer all types, so that, e.g., the effect of not-is-null is
         // propagated.
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
new file mode 100644
index 0000000..a6f49f9
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntervalSplitPartitioningRule.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.algebra.operators.IntervalLocalRangeSplitterOperator;
+import 
org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
+import 
org.apache.asterix.algebra.operators.physical.IntervalLocalRangeSplitterPOperator;
+import 
org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import 
org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+
+/**
+ * Before:
+ *
+ * <pre>
+ *
+ * Left
+ *
+ *
+ * Right
+ * </pre>
+ *
+ * After:
+ *
+ * <pre>
+ *
+ * Left
+ *
+ *
+ * Right
+ * </pre>
+ */
+public class IntervalSplitPartitioningRule implements IAlgebraicRewriteRule {
+
+    private static final int LEFT = 0;
+    private static final int RIGHT = 1;
+
+    private static final int START_SPLITS = 3;
+
+    private static final Set<FunctionIdentifier> intervalJoinConditions = new 
HashSet<>();
+    static {
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_AFTER);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_BEFORE);
+        
intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_COVERED_BY);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_COVERS);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_ENDED_BY);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_ENDS);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_MEETS);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_MET_BY);
+        
intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY);
+        
intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPPING);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_OVERLAPS);
+        
intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_STARTED_BY);
+        intervalJoinConditions.add(AsterixBuiltinFunctions.INTERVAL_STARTS);
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (!isIntervalJoin(op)) {
+            return false;
+        }
+        InnerJoinOperator startsJoin = (InnerJoinOperator) op;
+        ExecutionMode mode = startsJoin.getExecutionMode();
+        Mutable<ILogicalOperator> startsJoinRef = opRef;
+        Set<LogicalVariable> localLiveVars = new ListSet<>();
+        VariableUtilities.getLiveVariables(op, localLiveVars);
+
+        Mutable<ILogicalOperator> leftSortedInput = op.getInputs().get(0);
+        Mutable<ILogicalOperator> rightSortedInput = op.getInputs().get(1);
+        if (leftSortedInput.getValue().getOperatorTag() != 
LogicalOperatorTag.EXCHANGE
+                && rightSortedInput.getValue().getOperatorTag() != 
LogicalOperatorTag.EXCHANGE) {
+            return false;
+        }
+
+        Mutable<ILogicalOperator> leftSorter = 
leftSortedInput.getValue().getInputs().get(0);
+        Mutable<ILogicalOperator> rightSorter = 
rightSortedInput.getValue().getInputs().get(0);
+        if (leftSorter.getValue().getOperatorTag() != LogicalOperatorTag.ORDER
+                && rightSorter.getValue().getOperatorTag() != 
LogicalOperatorTag.ORDER) {
+            return false;
+        }
+        LogicalVariable leftSortKey = getSortKey(leftSorter.getValue());
+        LogicalVariable rightSortKey = getSortKey(rightSorter.getValue());
+        if (leftSortKey == null || rightSortKey == null) {
+            return false;
+        }
+
+        Mutable<ILogicalOperator> leftRangeInput = 
leftSorter.getValue().getInputs().get(0);
+        Mutable<ILogicalOperator> rightRangeInput = 
rightSorter.getValue().getInputs().get(0);
+        IRangeMap leftRangeMap = 
getRangeMapForBranch(leftRangeInput.getValue());
+        IRangeMap rightRangeMap = 
getRangeMapForBranch(rightRangeInput.getValue());
+        if (leftRangeMap == null || rightRangeMap == null) {
+            return false;
+        }
+        // TODO check physical join
+
+        // Interval local partition operators
+        LogicalVariable leftJoinKey = 
getJoinKey(startsJoin.getCondition().getValue(), LEFT);
+        LogicalVariable rightJoinKey = 
getJoinKey(startsJoin.getCondition().getValue(), RIGHT);
+        if (leftJoinKey == null || rightJoinKey == null) {
+            return false;
+        }
+        ILogicalOperator leftIntervalSplit = 
getIntervalSplitOperator(leftSortKey, rightRangeMap, mode);
+        Mutable<ILogicalOperator> leftIntervalSplitRef = new 
MutableObject<>(leftIntervalSplit);
+        ILogicalOperator rightIntervalSplit = 
getIntervalSplitOperator(rightSortKey, rightRangeMap, mode);
+        Mutable<ILogicalOperator> rightIntervalSplitRef = new 
MutableObject<>(rightIntervalSplit);
+
+        // Replicate operators
+        ReplicateOperator leftStartsSplit = getReplicateOperator(START_SPLITS, 
mode);
+        Mutable<ILogicalOperator> leftStartsSplitRef = new 
MutableObject<>(leftStartsSplit);
+        ReplicateOperator rightStartsSplit = 
getReplicateOperator(START_SPLITS, mode);
+        Mutable<ILogicalOperator> rightStartsSplitRef = new 
MutableObject<>(rightStartsSplit);
+
+        // Covers Join Operator
+        ILogicalOperator leftCoversJoin = 
getNestedLoop(startsJoin.getCondition(), context, mode);
+        Mutable<ILogicalOperator> leftCoversJoinRef = new 
MutableObject<>(leftCoversJoin);
+        ILogicalOperator rightCoversJoin = 
getNestedLoop(startsJoin.getCondition(), context, mode);
+        Mutable<ILogicalOperator> rightCoversJoinRef = new 
MutableObject<>(rightCoversJoin);
+
+        // Ends Join Operator
+        ILogicalOperator leftEndsJoin = getIntervalJoin(startsJoin, context, 
mode);
+        ILogicalOperator rightEndsJoin = getIntervalJoin(startsJoin, context, 
mode);
+        if (leftEndsJoin == null || rightEndsJoin == null) {
+            return false;
+        }
+        Mutable<ILogicalOperator> leftEndsJoinRef = new 
MutableObject<>(leftEndsJoin);
+        Mutable<ILogicalOperator> rightEndsJoinRef = new 
MutableObject<>(rightEndsJoin);
+
+        // Union All Operator
+        ILogicalOperator union1 = getUnionOperator(localLiveVars, mode);
+        Mutable<ILogicalOperator> union1Ref = new MutableObject<>(union1);
+        ILogicalOperator union2 = getUnionOperator(localLiveVars, mode);
+        Mutable<ILogicalOperator> union2Ref = new MutableObject<>(union2);
+        ILogicalOperator union3 = getUnionOperator(localLiveVars, mode);
+        Mutable<ILogicalOperator> union3Ref = new MutableObject<>(union3);
+        ILogicalOperator union4 = getUnionOperator(localLiveVars, mode);
+        Mutable<ILogicalOperator> union4Ref = new MutableObject<>(union4);
+
+        // Connect main path
+        connectOperators(leftIntervalSplitRef, leftSortedInput, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftIntervalSplit);
+        connectOperators(leftStartsSplitRef, leftIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftStartsSplit);
+        connectOperators(rightIntervalSplitRef, rightSortedInput, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightIntervalSplit);
+        connectOperators(rightStartsSplitRef, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightStartsSplit);
+        updateConnections(startsJoinRef, leftStartsSplitRef, context, LEFT);
+        updateConnections(startsJoinRef, rightStartsSplitRef, context, RIGHT);
+        context.computeAndSetTypeEnvironmentForOperator(startsJoin);
+        leftStartsSplit.getOutputs().add(startsJoinRef);
+        rightStartsSplit.getOutputs().add(startsJoinRef);
+
+        // Connect left ends path
+        connectOperators(leftEndsJoinRef, leftIntervalSplitRef, context);
+        connectOperators(leftEndsJoinRef, rightStartsSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftEndsJoin);
+        connectOperators(union1Ref, leftEndsJoinRef, context);
+        connectOperators(union1Ref, startsJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(union1);
+        rightStartsSplit.getOutputs().add(leftEndsJoinRef);
+
+        // Connect left covers path
+        connectOperators(leftCoversJoinRef, leftIntervalSplitRef, context);
+        connectOperators(leftCoversJoinRef, rightStartsSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(leftCoversJoin);
+        connectOperators(union2Ref, union1Ref, context);
+        connectOperators(union2Ref, leftCoversJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(union2);
+        rightStartsSplit.getOutputs().add(leftCoversJoinRef);
+
+        // Connect right ends path
+        connectOperators(rightEndsJoinRef, leftStartsSplitRef, context);
+        connectOperators(rightEndsJoinRef, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightEndsJoin);
+        connectOperators(union3Ref, union2Ref, context);
+        connectOperators(union3Ref, rightEndsJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(union3);
+        leftStartsSplit.getOutputs().add(rightEndsJoinRef);
+
+        // Connect right covers path
+        connectOperators(rightCoversJoinRef, leftStartsSplitRef, context);
+        connectOperators(rightCoversJoinRef, rightIntervalSplitRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(rightCoversJoin);
+        connectOperators(union4Ref, union3Ref, context);
+        connectOperators(union4Ref, rightCoversJoinRef, context);
+        context.computeAndSetTypeEnvironmentForOperator(union4);
+        leftStartsSplit.getOutputs().add(rightCoversJoinRef);
+
+        // Update context
+        opRef.setValue(union4);
+        return true;
+    }
+
+    private LogicalVariable getSortKey(ILogicalOperator op) {
+        if (op.getOperatorTag() != LogicalOperatorTag.ORDER) {
+            return null;
+        }
+        OrderOperator oo = (OrderOperator) op;
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> order = 
oo.getOrderExpressions();
+        Mutable<ILogicalExpression> sortLe = order.get(0).second;
+        if (sortLe.getValue().getExpressionTag() == 
LogicalExpressionTag.VARIABLE) {
+            return ((VariableReferenceExpression) 
sortLe.getValue()).getVariableReference();
+        }
+        return null;
+    }
+
+    private LogicalVariable getJoinKey(ILogicalExpression expr, int branch) {
+        if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return null;
+        }
+        // Check whether the function is a function we want to push.
+        AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expr;
+        if 
(!intervalJoinConditions.contains(funcExpr.getFunctionIdentifier())) {
+            return null;
+        }
+        ILogicalExpression funcArg = 
funcExpr.getArguments().get(branch).getValue();
+        if (funcArg instanceof VariableReferenceExpression) {
+            return ((VariableReferenceExpression) 
funcArg).getVariableReference();
+        }
+        return null;
+    }
+
+    private void connectOperators(Mutable<ILogicalOperator> from, 
Mutable<ILogicalOperator> to,
+            IOptimizationContext context) throws AlgebricksException {
+        if (to.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
+            ILogicalOperator eo = 
getExchangeOperator(from.getValue().getExecutionMode());
+            Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
+            eo.getInputs().add(to);
+            from.getValue().getInputs().add(eoRef);
+            context.computeAndSetTypeEnvironmentForOperator(eo);
+            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+        } else {
+            from.getValue().getInputs().add(to);
+            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+        }
+    }
+
+    private void updateConnections(Mutable<ILogicalOperator> from, 
Mutable<ILogicalOperator> to,
+            IOptimizationContext context, int index) throws 
AlgebricksException {
+        if (from.getValue().getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
+            ILogicalOperator eo = 
getExchangeOperator(from.getValue().getExecutionMode());
+            Mutable<ILogicalOperator> eoRef = new MutableObject<>(eo);
+            eo.getInputs().add(to);
+            from.getValue().getInputs().set(index, eoRef);
+            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+            context.computeAndSetTypeEnvironmentForOperator(eo);
+        } else {
+            from.getValue().getInputs().set(index, to);
+            context.computeAndSetTypeEnvironmentForOperator(from.getValue());
+        }
+    }
+
+    private ILogicalOperator getExchangeOperator(ExecutionMode mode) {
+        ExchangeOperator eo = new ExchangeOperator();
+        eo.setPhysicalOperator(new OneToOneExchangePOperator());
+        eo.setExecutionMode(mode);
+        return eo;
+    }
+
+    private ILogicalOperator getIntervalSplitOperator(LogicalVariable key, 
IRangeMap rangeMap, ExecutionMode mode) {
+        List<LogicalVariable> joinKeyLogicalVars = new ArrayList<>();
+        joinKeyLogicalVars.add(key);
+        //create the logical and physical operator
+        IntervalLocalRangeSplitterOperator splitOperator = new 
IntervalLocalRangeSplitterOperator(joinKeyLogicalVars);
+        IntervalLocalRangeSplitterPOperator splitPOperator = new 
IntervalLocalRangeSplitterPOperator(joinKeyLogicalVars,
+                rangeMap);
+        splitOperator.setPhysicalOperator(splitPOperator);
+        splitOperator.setExecutionMode(mode);
+
+        //create ExtensionOperator and put the commitOperator in it.
+        ExtensionOperator extensionOperator = new 
ExtensionOperator(splitOperator);
+        extensionOperator.setPhysicalOperator(splitPOperator);
+        extensionOperator.setExecutionMode(mode);
+        return extensionOperator;
+    }
+
+    private ReplicateOperator getReplicateOperator(int outputArity, 
ExecutionMode mode) {
+        boolean[] flags = new boolean[outputArity];
+        ReplicateOperator ro = new ReplicateOperator(flags.length, flags);
+        ReplicatePOperator rpo = new ReplicatePOperator();
+        ro.setPhysicalOperator(rpo);
+        ro.setExecutionMode(mode);
+        return ro;
+    }
+
+    private ILogicalOperator getNestedLoop(Mutable<ILogicalExpression> 
condition, IOptimizationContext context,
+            ExecutionMode mode) {
+        int memoryJoinSize = 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin();
+        InnerJoinOperator ijo = new InnerJoinOperator(condition);
+        NestedLoopJoinPOperator nljpo = new 
NestedLoopJoinPOperator(JoinKind.INNER, JoinPartitioningType.BROADCAST,
+                memoryJoinSize);
+        ijo.setPhysicalOperator(nljpo);
+        ijo.setExecutionMode(mode);
+        return ijo;
+    }
+
+    private ILogicalOperator getIntervalJoin(ILogicalOperator op, 
IOptimizationContext context, ExecutionMode mode) {
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return null;
+        }
+        InnerJoinOperator ijo = (InnerJoinOperator) op;
+        InnerJoinOperator ijoClone = new InnerJoinOperator(ijo.getCondition());
+
+        int memoryJoinSize = 
context.getPhysicalOptimizationConfig().getMaxFramesForJoin();
+        IPhysicalOperator joinPo = ijo.getPhysicalOperator();
+        if (joinPo.getOperatorTag() == PhysicalOperatorTag.MERGE_JOIN) {
+            MergeJoinPOperator mjpo = (MergeJoinPOperator) joinPo;
+            MergeJoinPOperator mjpoClone = new 
MergeJoinPOperator(mjpo.getKind(), mjpo.getPartitioningType(),
+                    mjpo.getKeysLeftBranch(), mjpo.getKeysRightBranch(), 
memoryJoinSize,
+                    mjpo.getMergeJoinCheckerFactory(), mjpo.getRangeMap());
+            ijoClone.setPhysicalOperator(mjpoClone);
+        } else if (joinPo.getOperatorTag() == 
PhysicalOperatorTag.EXTENSION_OPERATOR) {
+            if (joinPo instanceof IntervalIndexJoinPOperator) {
+                IntervalIndexJoinPOperator iijpo = 
(IntervalIndexJoinPOperator) joinPo;
+                IntervalIndexJoinPOperator iijpoClone = new 
IntervalIndexJoinPOperator(iijpo.getKind(),
+                        iijpo.getPartitioningType(), 
iijpo.getKeysLeftBranch(), iijpo.getKeysRightBranch(),
+                        memoryJoinSize, 
iijpo.getIntervalMergeJoinCheckerFactory(), iijpo.getRangeMap());
+                ijoClone.setPhysicalOperator(iijpoClone);
+            } else if (joinPo instanceof IntervalPartitionJoinPOperator) {
+                IntervalPartitionJoinPOperator ipjpo = 
(IntervalPartitionJoinPOperator) joinPo;
+                IntervalPartitionJoinPOperator iijpoClone = new 
IntervalPartitionJoinPOperator(ipjpo.getKind(),
+                        ipjpo.getPartitioningType(), 
ipjpo.getKeysLeftBranch(), ipjpo.getKeysRightBranch(),
+                        memoryJoinSize, ipjpo.getBuildTupleCount(), 
ipjpo.getProbeTupleCount(),
+                        ipjpo.getBuildMaxDuration(), 
ipjpo.getProbeMaxDuration(), ipjpo.getAvgTuplesInFrame(),
+                        ipjpo.getIntervalMergeJoinCheckerFactory(), 
ipjpo.getRangeMap());
+                ijoClone.setPhysicalOperator(iijpoClone);
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        ijoClone.setExecutionMode(mode);
+        return ijoClone;
+    }
+
+    private ILogicalOperator getUnionOperator(Set<LogicalVariable> 
localLiveVars, ExecutionMode mode) {
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap 
= new ArrayList<>();
+        for (LogicalVariable lv : localLiveVars) {
+            varMap.add(new Triple<LogicalVariable, LogicalVariable, 
LogicalVariable>(lv, lv, lv));
+        }
+        UnionAllOperator uao = new UnionAllOperator(varMap);
+        uao.setPhysicalOperator(new UnionAllPOperator());
+        uao.setExecutionMode(mode);
+        return uao;
+    }
+
+    private boolean isIntervalJoin(ILogicalOperator op) {
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return false;
+        }
+        // TODO add check for condition.
+        InnerJoinOperator ijo = (InnerJoinOperator) op;
+        if (ijo.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.MERGE_JOIN) {
+            return true;
+        }
+        if (ijo.getPhysicalOperator().getOperatorTag() == 
PhysicalOperatorTag.EXTENSION_OPERATOR) {
+            return true;
+        }
+        return false;
+    }
+
+    private IRangeMap getRangeMapForBranch(ILogicalOperator op) {
+        if (op.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
+            return null;
+        }
+        ExchangeOperator exchangeLeft = (ExchangeOperator) op;
+        if (exchangeLeft.getPhysicalOperator().getOperatorTag() != 
PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE) {
+            return null;
+        }
+        RangePartitionExchangePOperator exchangeLeftPO = 
(RangePartitionExchangePOperator) exchangeLeft
+                .getPhysicalOperator();
+        if (exchangeLeftPO.getRangeType() != RangePartitioningType.SPLIT) {
+            return null;
+        }
+        return exchangeLeftPO.getRangeMap();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 0d8d7bf..21e07a5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -19,9 +19,6 @@
 
 package org.apache.asterix.runtime.operators.joins.intervalpartition;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 1a7e224..da85612 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -52,7 +52,7 @@ public abstract class AbstractLogicalOperator implements 
ILogicalOperator {
      * partition and only processes data from that partition
      */
 
-    public static enum ExecutionMode {
+    public enum ExecutionMode {
         UNPARTITIONED,
         PARTITIONED,
         LOCAL
@@ -60,15 +60,14 @@ public abstract class AbstractLogicalOperator implements 
ILogicalOperator {
 
     private AbstractLogicalOperator.ExecutionMode mode = 
AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
     protected IPhysicalOperator physicalOperator;
-    private final Map<String, Object> annotations = new HashMap<String, 
Object>();
+    private final Map<String, Object> annotations = new HashMap<>();
     private boolean bJobGenEnabled = true;
 
-    final protected List<Mutable<ILogicalOperator>> inputs;
-    // protected List<LogicalOperatorReference> outputs;
+    protected final List<Mutable<ILogicalOperator>> inputs;
     protected List<LogicalVariable> schema;
 
     public AbstractLogicalOperator() {
-        inputs = new ArrayList<Mutable<ILogicalOperator>>();
+        inputs = new ArrayList<>();
     }
 
     @Override
@@ -134,11 +133,6 @@ public abstract class AbstractLogicalOperator implements 
ILogicalOperator {
         return inputs;
     }
 
-    // @Override
-    // public final List<LogicalOperatorReference> getOutputs() {
-    // return outputs;
-    // }
-
     @Override
     public final boolean hasInputs() {
         return !inputs.isEmpty();
@@ -161,7 +155,7 @@ public abstract class AbstractLogicalOperator implements 
ILogicalOperator {
     @Override
     public final void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         if (bJobGenEnabled) {
             if (physicalOperator == null) {
                 throw new AlgebricksException("Physical operator not set for 
operator: " + this);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 343ace8..834107c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -34,20 +34,20 @@ import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit
 
 public class ReplicateOperator extends AbstractLogicalOperator {
 
-    private int outputArity = 2;
-    private boolean[] outputMaterializationFlags = new boolean[outputArity];
+    private int outputArity;
+    private boolean[] outputMaterializationFlags;
     private List<Mutable<ILogicalOperator>> outputs;
 
     public ReplicateOperator(int outputArity) {
         this.outputArity = outputArity;
         this.outputMaterializationFlags = new boolean[outputArity];
-        this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
+        this.outputs = new ArrayList<>();
     }
 
     public ReplicateOperator(int outputArity, boolean[] 
outputMaterializationFlags) {
         this.outputArity = outputArity;
         this.outputMaterializationFlags = outputMaterializationFlags;
-        this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
+        this.outputs = new ArrayList<>();
     }
 
     @Override
@@ -89,10 +89,6 @@ public class ReplicateOperator extends 
AbstractLogicalOperator {
         return outputArity;
     }
 
-    public int setOutputArity(int outputArity) {
-        return this.outputArity = outputArity;
-    }
-
     public void setOutputMaterializationFlags(boolean[] 
outputMaterializationFlags) {
         this.outputMaterializationFlags = outputMaterializationFlags;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
index 04c34b8..51f54f6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
@@ -75,6 +75,22 @@ public class MergeJoinPOperator extends 
AbstractJoinPOperator {
                 + mjcf + ", IRangeMap rangeMap=" + rangeMap + ".");
     }
 
+    public List<LogicalVariable> getKeysLeftBranch() {
+        return keysLeftBranch;
+    }
+
+    public List<LogicalVariable> getKeysRightBranch() {
+        return keysRightBranch;
+    }
+
+    public IMergeJoinCheckerFactory getMergeJoinCheckerFactory() {
+        return mjcf;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.MERGE_JOIN;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/aea7fe87/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
deleted file mode 100644
index 5384347..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
-
-/**
- * The right input is broadcast and the left input can be partitioned in any 
way.
- */
-public class NLJoinPOperator extends AbstractJoinPOperator {
-
-    private final int memSize;
-
-    public NLJoinPOperator(JoinKind kind, JoinPartitioningType 
partitioningType, int memSize) {
-        super(kind, partitioningType);
-        this.memSize = memSize;
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.NESTED_LOOP;
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator iop, 
IOptimizationContext context) {
-        if (partitioningType != JoinPartitioningType.BROADCAST) {
-            throw new NotImplementedException(partitioningType + " nested loop 
joins are not implemented.");
-        }
-
-        IPartitioningProperty pp;
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
-
-        if (op.getExecutionMode() == 
AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(1).getValue();
-            IPhysicalPropertiesVector pv1 = 
op2.getPhysicalOperator().getDeliveredProperties();
-            if (pv1 == null) {
-                pp = null;
-            } else {
-                pp = pv1.getPartitioningProperty();
-            }
-        } else {
-            pp = IPartitioningProperty.UNPARTITIONED;
-        }
-
-        // Nested loop join cannot maintain the local structure property for 
the probe side
-        // because of the I/O optimization for the build branch.
-        this.deliveredProperties = new StructuralPropertiesVector(pp, null);
-    }
-
-    @Override
-    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
-        if (partitioningType != JoinPartitioningType.BROADCAST) {
-            throw new NotImplementedException(partitioningType + " nested loop 
joins are not implemented.");
-        }
-
-        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
-
-        // TODO: leverage statistics to make better decisions.
-        pv[0] = new StructuralPropertiesVector(null, null);
-        pv[1] = new StructuralPropertiesVector(new 
BroadcastPartitioningProperty(context.getComputationNodeDomain()),
-                null);
-        return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
-        RecordDescriptor recDescriptor = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
-                propagatedSchema, context);
-        IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
-        conditionInputSchemas[0] = propagatedSchema;
-        IExpressionRuntimeProvider expressionRuntimeProvider = 
context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory cond = 
expressionRuntimeProvider.createEvaluatorFactory(join.getCondition().getValue(),
-                context.getTypeEnvironment(op), conditionInputSchemas, 
context);
-        ITuplePairComparatorFactory comparatorFactory = new 
TuplePairEvaluatorFactory(cond,
-                context.getBinaryBooleanInspectorFactory());
-        IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        IOperatorDescriptor opDesc = null;
-
-        switch (kind) {
-            case INNER: {
-                opDesc = new NestedLoopJoinOperatorDescriptor(spec, 
comparatorFactory, recDescriptor, memSize, false,
-                        null);
-                break;
-            }
-            case LEFT_OUTER: {
-                IMissingWriterFactory[] nonMatchWriterFactories = new 
IMissingWriterFactory[inputSchemas[1].getSize()];
-                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                    nonMatchWriterFactories[j] = 
context.getMissingWriterFactory();
-                }
-                opDesc = new NestedLoopJoinOperatorDescriptor(spec, 
comparatorFactory, recDescriptor, memSize, true,
-                        nonMatchWriterFactories);
-                break;
-            }
-            default: {
-                throw new NotImplementedException();
-            }
-        }
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-
-        ILogicalOperator src1 = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src1, 0, op, 0);
-        ILogicalOperator src2 = op.getInputs().get(1).getValue();
-        builder.contributeGraphEdge(src2, 0, op, 1);
-    }
-
-    public static class TuplePairEvaluatorFactory implements 
ITuplePairComparatorFactory {
-
-        private static final long serialVersionUID = 1L;
-        private final IScalarEvaluatorFactory cond;
-        private final IBinaryBooleanInspectorFactory 
binaryBooleanInspectorFactory;
-
-        public TuplePairEvaluatorFactory(IScalarEvaluatorFactory cond,
-                IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
-            this.cond = cond;
-            this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
-        }
-
-        @Override
-        public synchronized ITuplePairComparator 
createTuplePairComparator(IHyracksTaskContext ctx) {
-            return new TuplePairEvaluator(ctx, cond, 
binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx));
-        }
-    }
-
-    public static class TuplePairEvaluator implements ITuplePairComparator {
-        private final IHyracksTaskContext ctx;
-        private IScalarEvaluator condEvaluator;
-        private final IScalarEvaluatorFactory condFactory;
-        private final IPointable p;
-        private final CompositeFrameTupleReference compositeTupleRef;
-        private final FrameTupleReference leftRef;
-        private final FrameTupleReference rightRef;
-        private final IBinaryBooleanInspector binaryBooleanInspector;
-
-        public TuplePairEvaluator(IHyracksTaskContext ctx, 
IScalarEvaluatorFactory condFactory,
-                IBinaryBooleanInspector binaryBooleanInspector) {
-            this.ctx = ctx;
-            this.condFactory = condFactory;
-            this.binaryBooleanInspector = binaryBooleanInspector;
-            this.leftRef = new FrameTupleReference();
-            this.p = VoidPointable.FACTORY.createPointable();
-            this.rightRef = new FrameTupleReference();
-            this.compositeTupleRef = new CompositeFrameTupleReference(leftRef, 
rightRef);
-        }
-
-        @Override
-        public int compare(IFrameTupleAccessor outerAccessor, int outerIndex, 
IFrameTupleAccessor innerAccessor,
-                int innerIndex) throws HyracksDataException {
-            if (condEvaluator == null) {
-                try {
-                    this.condEvaluator = 
condFactory.createScalarEvaluator(ctx);
-                } catch (AlgebricksException ae) {
-                    throw new HyracksDataException(ae);
-                }
-            }
-            compositeTupleRef.reset(outerAccessor, outerIndex, innerAccessor, 
innerIndex);
-            try {
-                condEvaluator.evaluate(compositeTupleRef, p);
-            } catch (AlgebricksException ae) {
-                throw new HyracksDataException(ae);
-            }
-            boolean result = 
binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
-                    p.getLength());
-            if (result) {
-                return 0;
-            } else {
-                return 1;
-            }
-        }
-    }
-
-    public static class CompositeFrameTupleReference implements 
IFrameTupleReference {
-
-        private final FrameTupleReference refLeft;
-        private final FrameTupleReference refRight;
-
-        public CompositeFrameTupleReference(FrameTupleReference refLeft, 
FrameTupleReference refRight) {
-            this.refLeft = refLeft;
-            this.refRight = refRight;
-        }
-
-        public void reset(IFrameTupleAccessor outerAccessor, int outerIndex, 
IFrameTupleAccessor innerAccessor,
-                int innerIndex) {
-            refLeft.reset(outerAccessor, outerIndex);
-            refRight.reset(innerAccessor, innerIndex);
-        }
-
-        @Override
-        public int getFieldCount() {
-            return refLeft.getFieldCount() + refRight.getFieldCount();
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldData(fIdx);
-            } else {
-                return refRight.getFieldData(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldStart(fIdx);
-            } else {
-                return refRight.getFieldStart(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount) {
-                return refLeft.getFieldLength(fIdx);
-            } else {
-                return refRight.getFieldLength(fIdx - leftFieldCount);
-            }
-        }
-
-        @Override
-        public IFrameTupleAccessor getFrameTupleAccessor() {
-            throw new NotImplementedException();
-        }
-
-        @Override
-        public int getTupleIndex() {
-            throw new NotImplementedException();
-        }
-
-    }
-}

Reply via email to