Range forward operators.

Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/933c1307
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/933c1307
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/933c1307

Branch: refs/heads/ecarm002/interval_join_merge
Commit: 933c13078397f07e7528908187f06d7c5c018acd
Parents: 5b102cc
Author: Preston Carman <prest...@apache.org>
Authored: Wed Jul 27 12:14:21 2016 -0700
Committer: Preston Carman <prest...@apache.org>
Committed: Wed Jul 27 12:14:21 2016 -0700

----------------------------------------------------------------------
 .../core/algebra/base/LogicalOperatorTag.java   |   1 +
 .../core/algebra/base/PhysicalOperatorTag.java  |   1 +
 .../operators/logical/RangeForwardOperator.java |  80 +++++++++++++
 .../visitors/CardinalityInferenceVisitor.java   |   6 +
 .../visitors/FDsAndEquivClassesVisitor.java     |   7 ++
 .../visitors/IsomorphismOperatorVisitor.java    |  10 ++
 .../IsomorphismVariableMappingVisitor.java      |   7 ++
 ...OperatorDeepCopyWithNewVariablesVisitor.java |  10 ++
 .../visitors/LogicalPropertiesVisitor.java      |   7 ++
 .../visitors/OperatorDeepCopyVisitor.java       |  25 ++--
 .../visitors/PrimaryKeyVariablesVisitor.java    |   7 ++
 .../visitors/ProducedVariableVisitor.java       |   6 +
 .../logical/visitors/SchemaVariableVisitor.java |   6 +
 .../visitors/SubstituteVariableVisitor.java     |   7 ++
 .../logical/visitors/UsedVariableVisitor.java   |   6 +
 .../physical/RangeForwardPOperator.java         |  85 +++++++++++++
 .../LogicalOperatorPrettyPrintVisitor.java      |   9 ++
 .../visitors/ILogicalOperatorVisitor.java       |   3 +
 .../misc/RangeForwardOperatorDescriptor.java    | 119 +++++++++++++++++++
 19 files changed, 393 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index d39a8c8..07c35a3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -40,6 +40,7 @@ public enum LogicalOperatorTag {
     ORDER,
     PARTITIONINGSPLIT,
     PROJECT,
+    RANGE_FORWARD,
     REPLICATE,
     RUNNINGAGGREGATE,
     SCRIPT,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 7f753a5..8ae6602 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -52,6 +52,7 @@ public enum PhysicalOperatorTag {
     PRE_SORTED_DISTINCT_BY,
     RANDOM_PARTITION_EXCHANGE,
     RANDOM_MERGE_EXCHANGE,
+    RANGE_FORWARD,
     RANGE_PARTITION_EXCHANGE,
     RANGE_PARTITION_MERGE_EXCHANGE,
     RTREE_SEARCH,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
new file mode 100644
index 0000000..5a3bc98
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class RangeForwardOperator extends AbstractLogicalOperator {
+
+    private IRangeMap rangeMap;
+
+    public RangeForwardOperator(IRangeMap rangeMap) {
+        this.rangeMap = rangeMap;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.RANGE_FORWARD;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean 
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws 
AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) 
throws AlgebricksException {
+        return visitor.visitRangeForwardOperator(this, arg);
+    }
+
+    @Override
+    public IVariableTypeEnvironment 
computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new 
ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 8d74de3..58f6e5d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -145,6 +146,11 @@ public class CardinalityInferenceVisitor implements 
ILogicalOperatorVisitor<Long
     }
 
     @Override
+    public Long visitRangeForwardOperator(RangeForwardOperator op, Void arg) 
throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
     public Long visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return op.getInputs().get(0).getValue().accept(this, arg);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 85c55ec..83b5798 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -68,6 +68,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -419,6 +420,12 @@ public class FDsAndEquivClassesVisitor implements 
ILogicalOperatorVisitor<Void,
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, 
IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c0e8b34..737b246 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -298,6 +299,15 @@ public class IsomorphismOperatorVisitor implements 
ILogicalOperatorVisitor<Boole
     }
 
     @Override
+    public Boolean visitRangeForwardOperator(RangeForwardOperator op, 
ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.RANGE_FORWARD) {
+            return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
+    }
+
+    @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 3db6af9..dbac3fb 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -176,6 +177,12 @@ public class IsomorphismVariableMappingVisitor implements 
ILogicalOperatorVisito
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, 
ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
+   @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 2c4dacf..8cb739a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -55,6 +55,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -406,6 +407,15 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     }
 
     @Override
+    public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, 
ILogicalOperator arg)
+            throws AlgebricksException {
+        // TODO fix deep copy of range map
+        RangeForwardOperator opCopy = new 
RangeForwardOperator(op.getRangeMap());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
ILogicalOperator arg)
             throws AlgebricksException {
         MaterializeOperator opCopy = new MaterializeOperator();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8d3644d..58b2cd7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -49,6 +49,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -193,6 +194,12 @@ public class LogicalPropertiesVisitor implements 
ILogicalOperatorVisitor<Void, I
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, 
IOptimizationContext arg) throws AlgebricksException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 99123b3..6337187 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -55,6 +56,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -294,9 +296,9 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
         deepCopyExpressionRefs(newKeyExpressions, 
op.getPrimaryKeyExpressions());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = 
new ArrayList<>();
         deepCopyExpressionRefs(newKeyExpressions, 
op.getAdditionalFilteringExpressions());
-        InsertDeleteUpsertOperator insertDeleteOp =
-                new InsertDeleteUpsertOperator(op.getDataSource(), 
deepCopyExpressionRef(op.getPayloadExpression()),
-                        newKeyExpressions, op.getOperation(), op.isBulkload());
+        InsertDeleteUpsertOperator insertDeleteOp = new 
InsertDeleteUpsertOperator(op.getDataSource(),
+                deepCopyExpressionRef(op.getPayloadExpression()), 
newKeyExpressions, op.getOperation(),
+                op.isBulkload());
         
insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
         return insertDeleteOp;
     }
@@ -308,8 +310,8 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
         deepCopyExpressionRefs(newPrimaryKeyExpressions, 
op.getPrimaryKeyExpressions());
         List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new 
ArrayList<>();
         deepCopyExpressionRefs(newSecondaryKeyExpressions, 
op.getSecondaryKeyExpressions());
-        Mutable<ILogicalExpression> newFilterExpression =
-                new MutableObject<>(((AbstractLogicalExpression) 
op.getFilterExpression()).cloneExpression());
+        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<>(
+                ((AbstractLogicalExpression) 
op.getFilterExpression()).cloneExpression());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = 
new ArrayList<>();
         deepCopyExpressionRefs(newLSMComponentFilterExpressions, 
op.getAdditionalFilteringExpressions());
         IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new 
IndexInsertDeleteUpsertOperator(
@@ -327,8 +329,8 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
         deepCopyExpressionRefs(newSecondaryKeyExpressions, 
op.getSecondaryKeyExpressions());
         List<LogicalVariable> newTokenizeVars = new ArrayList<>();
         deepCopyVars(newTokenizeVars, op.getTokenizeVars());
-        Mutable<ILogicalExpression> newFilterExpression =
-                new MutableObject<>(((AbstractLogicalExpression) 
op.getFilterExpression()).cloneExpression());
+        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<>(
+                ((AbstractLogicalExpression) 
op.getFilterExpression()).cloneExpression());
         List<Object> newTokenizeVarTypes = new ArrayList<>();
         deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
 
@@ -372,8 +374,8 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
         return newObjs;
     }
 
-    private List<Pair<IOrder, Mutable<ILogicalExpression>>>
-            deepCopyOrderAndExpression(List<Pair<IOrder, 
Mutable<ILogicalExpression>>> ordersAndExprs) {
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> 
deepCopyOrderAndExpression(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
         List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = 
new ArrayList<>();
         for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs) {
             newOrdersAndExprs.add(new Pair<>(pair.first, 
deepCopyExpressionRef(pair.second)));
@@ -387,6 +389,11 @@ public class OperatorDeepCopyVisitor implements 
ILogicalOperatorVisitor<ILogical
     }
 
     @Override
+    public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, 
Void arg) throws AlgebricksException {
+        return new RangeForwardOperator(op.getRangeMap());
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, 
Void arg) throws AlgebricksException {
         return new MaterializeOperator();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f01b20f..f3d2990 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -51,6 +51,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -185,6 +186,12 @@ public class PrimaryKeyVariablesVisitor implements 
ILogicalOperatorVisitor<Void,
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, 
IOptimizationContext ctx)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
IOptimizationContext ctx) throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 10659b1..1a2c754 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -55,6 +55,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -255,6 +256,11 @@ public class ProducedVariableVisitor implements 
ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, Void arg) 
throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index d35153a..873b847 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -53,6 +53,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -288,6 +289,11 @@ public class SchemaVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, Void arg) 
throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         standardLayout(op);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index f2da7c4..3623221 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -54,6 +54,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -414,6 +415,12 @@ public class SubstituteVariableVisitor
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, 
Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, 
Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e966406..8f88685 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -54,6 +54,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -440,6 +441,11 @@ public class UsedVariableVisitor implements 
ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
+    public Void visitRangeForwardOperator(RangeForwardOperator op, Void arg) 
throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
new file mode 100644
index 0000000..2324c5e
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor;
+
+public class RangeForwardPOperator extends AbstractPhysicalOperator {
+
+    private IRangeMap rangeMap;
+
+    public RangeForwardPOperator(IRangeMap rangeMap) {
+        this.rangeMap = rangeMap;
+    }
+
+    public IRangeMap getRangeMap() {
+        return rangeMap;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANGE_FORWARD;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) 
op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RangeForwardOperatorDescriptor opDesc = new 
RangeForwardOperatorDescriptor(spec, rangeMap);
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index f961bd9..b649b13 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -52,6 +52,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -367,6 +368,13 @@ public class LogicalOperatorPrettyPrintVisitor implements 
ILogicalOperatorVisito
     }
 
     @Override
+    public String visitRangeForwardOperator(RangeForwardOperator op, Integer 
indent) throws AlgebricksException {
+        StringBuilder buffer = new StringBuilder();
+        addIndent(buffer, indent).append("range forward ");
+        return buffer.toString();
+    }
+
+    @Override
     public String visitMaterializeOperator(MaterializeOperator op, Integer 
indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("materialize ");
@@ -515,4 +523,5 @@ public class LogicalOperatorPrettyPrintVisitor implements 
ILogicalOperatorVisito
         }
         sb.append("]");
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c0f9718..0548ab3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -41,6 +41,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperato
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -86,6 +87,8 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitReplicateOperator(ReplicateOperator op, T arg) throws 
AlgebricksException;
 
+    public R visitRangeForwardOperator(RangeForwardOperator op, T arg) throws 
AlgebricksException;
+
     public R visitMaterializeOperator(MaterializeOperator op, T arg) throws 
AlgebricksException;
 
     public R visitScriptOperator(ScriptOperator op, T arg) throws 
AlgebricksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/933c1307/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
new file mode 100644
index 0000000..15d91be
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.sort.ISorter;
+
+public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor 
{
+    private static final long serialVersionUID = 1L;
+
+    private static final int RANGE_FORWARD_ACTIVITY_ID = 0;
+    private static final int RANGE_WRITER_ACTIVITY_ID = 1;
+
+    private final IRangeMap rangeMap;
+
+    public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IRangeMap rangeMap) {
+        super(spec, 1, 1);
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ForwardActivityNode fan = new ForwardActivityNode(new ActivityId(odId, 
RANGE_FORWARD_ACTIVITY_ID));
+        builder.addActivity(this, fan);
+        builder.addSourceEdge(0, fan, 0);
+        builder.addTargetEdge(0, fan, 0);
+    }
+
+    public static class RangeForwardTaskState extends AbstractStateObject {
+        private IRangeMap rangeMap;
+
+        public RangeForwardTaskState(JobId jobId, TaskId taskId, IRangeMap 
rangeMap) {
+            super(jobId, taskId);
+            this.rangeMap = rangeMap;
+        }
+
+        public IRangeMap getRangeMap() {
+            return rangeMap;
+        }
+    }
+
+    private final class ForwardActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public ForwardActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final 
IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int 
partition, int nPartitions) {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private RangeForwardTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new 
RangeForwardTaskState(ctx.getJobletContext().getJobId(),
+                            new TaskId(getActivityId(), partition), rangeMap);
+                    ctx.setStateObject(state);
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws 
HyracksDataException {
+                    FrameUtils.flushFrame(bufferAccessor, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    writer.close();
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    writer.fail();
+                }
+            };
+        }
+    }
+}

Reply via email to