http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java new file mode 100644 index 0000000..db11712 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.logical; + +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy; +import org.apache.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer; +import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; +import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer; +import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; + +/** + * Forward operator is used to forward data to different NCs based on a range map that is computed dynamically + * by doing a pass over the data itself to infer the range map. The operator takes two inputs: + * 1. Tuples/data (at index 0). The data is forwarded to the range-based connector which routes it to the target NC. + * 2. Range map (at index 1). The range map will be stored in Hyracks context, and the connector will pick it up. + * Forward operator will receive the range map when it is broadcast by the operator generating the range map after which + * the forward operator will start forwarding the data. + */ +public class ForwardOperator extends AbstractLogicalOperator { + + private final String rangeMapKey; + private final Mutable<ILogicalExpression> rangeMapExpression; + + public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression> rangeMapExpression) { + super(); + this.rangeMapKey = rangeMapKey; + this.rangeMapExpression = rangeMapExpression; + } + + public String getRangeMapKey() { + return rangeMapKey; + } + + public Mutable<ILogicalExpression> getRangeMapExpression() { + return rangeMapExpression; + } + + @Override + public LogicalOperatorTag getOperatorTag() { + return LogicalOperatorTag.FORWARD; + } + + @Override + public void recomputeSchema() throws AlgebricksException { + // schema is equal to the schema of the data source at idx 0 + setSchema(inputs.get(0).getValue().getSchema()); + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException { + return visitor.transform(rangeMapExpression); + } + + @Override + public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException { + return visitor.visitForwardOperator(this, arg); + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public VariablePropagationPolicy getVariablePropagationPolicy() { + return new VariablePropagationPolicy() { + + @Override + public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) + throws AlgebricksException { + // propagate the variables of the data source at idx 0 + if (sources.length > 0) { + target.addAllVariables(sources[0]); + } + } + }; + } + + @Override + public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException { + // propagate the type environment of the data source at idx 0 + ITypeEnvPointer[] envPointers = new ITypeEnvPointer[] { new OpRefTypeEnvPointer(inputs.get(0), ctx) }; + return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMissableTypeComputer(), + ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers); + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java index d0aea60..9d853eb 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java @@ -31,11 +31,12 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -276,6 +277,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long } @Override + public Long visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + return op.getInputs().get(0).getValue().accept(this, arg); + } + + @Override public Long visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException { long cardinality = UNKNOWN; for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java index d0d121f..16fc1ed 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java @@ -53,6 +53,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -560,6 +561,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void, } @Override + public Void visitForwardOperator(ForwardOperator op, IOptimizationContext ctx) throws AlgebricksException { + propagateFDsAndEquivClasses(op, ctx); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException { setEmptyFDsEqClasses(op, ctx); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java index 90c0067..2b5e569 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -595,6 +596,18 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole } @Override + public Boolean visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException { + AbstractLogicalOperator argOperator = (AbstractLogicalOperator) arg; + if (argOperator.getOperatorTag() != LogicalOperatorTag.FORWARD) { + return Boolean.FALSE; + } + ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op, arg); + ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue(); + ILogicalExpression otherRangeMapExp = otherOp.getRangeMapExpression().getValue(); + return rangeMapExp.equals(otherRangeMapExp) && op.getRangeMapKey().equals(otherOp.getRangeMapKey()); + } + + @Override public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException { return true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java index 2caa252..742d485 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -284,6 +285,12 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito } @Override + public Void visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException { + mapVariablesStandard(op, arg); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException { mapVariablesStandard(op, arg); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index e0210cc..0196db6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -587,6 +588,14 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor } @Override + public ILogicalOperator visitForwardOperator(ForwardOperator op, ILogicalOperator arg) throws AlgebricksException { + ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(), + exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression())); + deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); + return opCopy; + } + + @Override public ILogicalOperator visitDelegateOperator(DelegateOperator op, ILogicalOperator arg) throws AlgebricksException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java index 6dfe254..7d3d676 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java @@ -30,11 +30,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractAssi import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -289,6 +290,12 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I } @Override + public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException { + // TODO Auto-generated method stub + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, IOptimizationContext arg) throws AlgebricksException { // TODO Auto-generated method stub return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java index 0db0f74..c6f0c14 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java @@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -342,6 +343,11 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical } @Override + public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + return new ForwardOperator(op.getRangeMapKey(), deepCopyExpressionRef(op.getRangeMapExpression())); + } + + @Override public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { return new SinkOperator(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java index c96276f..f36f604 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java @@ -32,11 +32,12 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -278,6 +279,11 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void, } @Override + public Void visitForwardOperator(ForwardOperator op, IOptimizationContext arg) throws AlgebricksException { + return null; + } + + @Override public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, IOptimizationContext arg) throws AlgebricksException { return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java index ec96d48..5d0ef6a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java @@ -36,11 +36,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnne import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -278,6 +279,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo } @Override + public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java index 59ccd84..70ccf6d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java @@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -318,6 +319,13 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void } @Override + public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + // only consider variables from the branch of the data source + VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), schemaVariables); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException { standardLayout(op); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java index 3587e29..c62f555 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java @@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -460,6 +461,14 @@ public class SubstituteVariableVisitor } @Override + public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg) + throws AlgebricksException { + op.getRangeMapExpression().getValue().substituteVar(arg.first, arg.second); + substVarTypes(op, arg); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException { return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java index e66809e..2c68697 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java @@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -138,50 +139,43 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> switch (physOp.getOperatorTag()) { case BROADCAST_EXCHANGE: case ONE_TO_ONE_EXCHANGE: - case RANDOM_MERGE_EXCHANGE: { + case RANDOM_MERGE_EXCHANGE: + case SEQUENTIAL_MERGE_EXCHANGE: // No variables used. break; - } - case HASH_PARTITION_EXCHANGE: { - HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp; - usedVariables.addAll(concreteOp.getHashFields()); + case HASH_PARTITION_EXCHANGE: + HashPartitionExchangePOperator hashPartitionPOp = (HashPartitionExchangePOperator) physOp; + usedVariables.addAll(hashPartitionPOp.getHashFields()); break; - } - case HASH_PARTITION_MERGE_EXCHANGE: { - HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp; - usedVariables.addAll(concreteOp.getPartitionFields()); - for (OrderColumn orderCol : concreteOp.getOrderColumns()) { + case HASH_PARTITION_MERGE_EXCHANGE: + HashPartitionMergeExchangePOperator hashMergePOp = (HashPartitionMergeExchangePOperator) physOp; + usedVariables.addAll(hashMergePOp.getPartitionFields()); + for (OrderColumn orderCol : hashMergePOp.getOrderColumns()) { usedVariables.add(orderCol.getColumn()); } break; - } - case SORT_MERGE_EXCHANGE: { - SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp; - for (OrderColumn orderCol : concreteOp.getSortColumns()) { + case SORT_MERGE_EXCHANGE: + SortMergeExchangePOperator sortMergePOp = (SortMergeExchangePOperator) physOp; + for (OrderColumn orderCol : sortMergePOp.getSortColumns()) { usedVariables.add(orderCol.getColumn()); } break; - } - case RANGE_PARTITION_EXCHANGE: { - RangePartitionExchangePOperator concreteOp = (RangePartitionExchangePOperator) physOp; - for (OrderColumn partCol : concreteOp.getPartitioningFields()) { + case RANGE_PARTITION_EXCHANGE: + RangePartitionExchangePOperator rangePartitionPOp = (RangePartitionExchangePOperator) physOp; + for (OrderColumn partCol : rangePartitionPOp.getPartitioningFields()) { usedVariables.add(partCol.getColumn()); } break; - } - case RANGE_PARTITION_MERGE_EXCHANGE: { - RangePartitionMergeExchangePOperator concreteOp = (RangePartitionMergeExchangePOperator) physOp; - for (OrderColumn partCol : concreteOp.getPartitioningFields()) { + case RANGE_PARTITION_MERGE_EXCHANGE: + RangePartitionMergeExchangePOperator rangeMergePOp = (RangePartitionMergeExchangePOperator) physOp; + for (OrderColumn partCol : rangeMergePOp.getPartitioningFields()) { usedVariables.add(partCol.getColumn()); } break; - } - case RANDOM_PARTITION_EXCHANGE: { + case RANDOM_PARTITION_EXCHANGE: break; - } - default: { + default: throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'."); - } } } return null; @@ -439,6 +433,12 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> } @Override + public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException { + op.getRangeMapExpression().getValue().getUsedVariables(usedVariables); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Void arg) { return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java index 78e96a4..0c08369 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java @@ -19,6 +19,7 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,16 +30,20 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; @@ -67,16 +72,26 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat } @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - AbstractLogicalOperator op = (AbstractLogicalOperator) iop; - if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator sortOp, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext ctx) { + if (sortOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) { if (orderProp == null) { - computeLocalProperties(op); + computeLocalProperties(sortOp); } - StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector( - IPartitioningProperty.UNPARTITIONED, Collections.singletonList(orderProp)) }; - return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION); + StructuralPropertiesVector[] requiredProp = new StructuralPropertiesVector[1]; + IPartitioningProperty partitioning; + INodeDomain targetNodeDomain = ctx.getComputationNodeDomain(); + if (isFullParallel((AbstractLogicalOperator) sortOp, targetNodeDomain, ctx)) { + // partitioning requirement: input data is re-partitioned on sort columns (global ordering) + // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here + partitioning = new OrderedPartitionedProperty(Arrays.asList(sortColumns), targetNodeDomain); + } else { + // partitioning requirement: input data is unpartitioned (i.e. must be merged at one site) + partitioning = IPartitioningProperty.UNPARTITIONED; + } + // local requirement: each partition must be locally ordered + requiredProp[0] = new StructuralPropertiesVector(partitioning, Collections.singletonList(orderProp)); + return new PhysicalRequirements(requiredProp, IPartitioningRequirementsCoordinator.NO_COORDINATION); } else { return emptyUnaryRequirements(); } @@ -123,4 +138,27 @@ public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperat public boolean expensiveThanMaterialization() { return true; } + + /** + * When true, the sort operator requires ORDERED_PARTITION (only applicable to dynamic version for now). + * Conditions: + * 1. Execution mode == partitioned + * 2. Dynamic range map was not disabled by some checks + * 3. User didn't disable it + * 4. User didn't provide static range map + * 5. Physical sort operator is not in-memory + * 6. There are at least two partitions in the cluster + * @param sortOp the sort operator + * @param clusterDomain the partitions specification of the cluster + * @param ctx optimization context + * @return true if the sort operator should be full parallel sort, false otherwise. + */ + private boolean isFullParallel(AbstractLogicalOperator sortOp, INodeDomain clusterDomain, + IOptimizationContext ctx) { + return sortOp.getAnnotations().get(OperatorAnnotations.USE_DYNAMIC_RANGE) != Boolean.FALSE + && !sortOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE) + && sortOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.STABLE_SORT + && clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1 + && ctx.getPhysicalOptimizationConfig().getSortParallel(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java new file mode 100644 index 0000000..11c584e --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor; + +/** + * <pre> + * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}} + * idx0: Input data source -- + * |-- forward op. + * idx1: RangeMap generator-- + * </pre> + */ +public class ForwardPOperator extends AbstractPhysicalOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.FORWARD; + } + + /** + * Forward operator requires that the global aggregate operator broadcasts the range map. No required properties at + * the data source input. + * @param op {@see {@link ForwardOperator}} + * @param requiredByParent parent's requirements, which are not enforced for now, as we only explore one plan + * @param context the optimization context + * @return broadcast requirement at input 1; empty requirements at input 0; No coordination between the two. + */ + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) { + // broadcast the range map to the cluster node domain + INodeDomain targetDomain = context.getComputationNodeDomain(); + List<ILocalStructuralProperty> noProp = new ArrayList<>(); + StructuralPropertiesVector[] requiredAtInputs = new StructuralPropertiesVector[2]; + requiredAtInputs[0] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR; + requiredAtInputs[1] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(targetDomain), noProp); + return new PhysicalRequirements(requiredAtInputs, IPartitioningRequirementsCoordinator.NO_COORDINATION); + } + + /** + * Forward operator delivers whatever properties delivered by the input located at index = 0 (tuples source op). + * Subtree at index 0 must compute its delivered properties before any call to this method + * @param op forward logical operator + * @param context {@link IOptimizationContext} + * @throws AlgebricksException + */ + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + ILogicalOperator dataSourceOperator = op.getInputs().get(0).getValue(); + deliveredProperties = dataSourceOperator.getDeliveredPhysicalProperties().clone(); + } + + /** + * The output record descriptor of forward operator is the same as the output record descriptor of the data source + * which is located at index 0. + * @param builder Hyracks job builder + * @param context job generation context + * @param op {@see {@link ForwardOperator}} + * @param propagatedSchema not used + * @param inputSchemas schemas of all inputs + * @param outerPlanSchema not used + * @throws AlgebricksException + */ + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + ForwardOperator forwardOp = (ForwardOperator) op; + RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor( + context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), inputSchemas[0], context); + ForwardOperatorDescriptor forwardDescriptor = + new ForwardOperatorDescriptor(builder.getJobSpec(), forwardOp.getRangeMapKey(), dataInputDescriptor); + builder.contributeHyracksOperator(forwardOp, forwardDescriptor); + ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue(); + builder.contributeGraphEdge(dataSource, 0, forwardOp, 0); + ILogicalOperator rangemapSource = forwardOp.getInputs().get(1).getValue(); + builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1); + } + + @Override + public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) { + int[] outputDependencyLabels = new int[] { 1 }; + int[] inputDependencyLabels = new int[] { 1, 0 }; + return new Pair<>(inputDependencyLabels, outputDependencyLabels); + } + + @Override + public boolean isMicroOperator() { + return false; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java index 6630d32..aeb9ac7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java @@ -42,27 +42,40 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; -import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.dataflow.common.data.partition.range.DynamicFieldRangePartitionComputerFactory; import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; public class RangePartitionExchangePOperator extends AbstractExchangePOperator { private List<OrderColumn> partitioningFields; private INodeDomain domain; - private IRangeMap rangeMap; + private RangeMap rangeMap; + private final boolean rangeMapIsComputedAtRunTime; + private final String rangeMapKeyInContext; - public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, - IRangeMap rangeMap) { + private RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeMap rangeMap, + boolean rangeMapIsComputedAtRunTime, String rangeMapKeyInContext) { this.partitioningFields = partitioningFields; this.domain = domain; this.rangeMap = rangeMap; + this.rangeMapIsComputedAtRunTime = rangeMapIsComputedAtRunTime; + this.rangeMapKeyInContext = rangeMapKeyInContext; + } + + public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, String rangeMapKeyInContext, + INodeDomain domain) { + this(partitioningFields, domain, null, true, rangeMapKeyInContext); + } + + public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, + RangeMap rangeMap) { + this(partitioningFields, domain, rangeMap, false, ""); } @Override @@ -80,8 +93,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { @Override public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - IPartitioningProperty p = - new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain); + IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<>(partitioningFields), domain); this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>()); } @@ -97,32 +109,31 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator { int n = partitioningFields.size(); int[] sortFields = new int[n]; IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n]; - - INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider(); - INormalizedKeyComputerFactory nkcf = null; - IVariableTypeEnvironment env = context.getTypeEnvironment(op); int i = 0; for (OrderColumn oc : partitioningFields) { LogicalVariable var = oc.getColumn(); sortFields[i] = opSchema.findVariable(var); Object type = env.getVarType(var); - OrderKind order = oc.getOrder(); - if (i == 0 && nkcfProvider != null && type != null) { - nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC); - } IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider(); comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC); i++; } - ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap); - IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf); - return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null); + FieldRangePartitionComputerFactory partitionerFactory; + if (rangeMapIsComputedAtRunTime) { + partitionerFactory = new DynamicFieldRangePartitionComputerFactory(sortFields, comps, rangeMapKeyInContext, + op.getSourceLocation()); + } else { + partitionerFactory = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap); + } + + IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory); + return new Pair<>(conn, null); } @Override public String toString() { - return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount(); + final String splitCount = rangeMap == null ? "" : " SPLIT COUNT:" + Integer.toString(rangeMap.getSplitCount()); + return getOperatorTag().toString() + " " + partitioningFields + splitCount; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java index ec32a53..b015193 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java @@ -53,18 +53,18 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; -import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.StaticFieldRangePartitionComputerFactory; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor; public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator { private List<OrderColumn> partitioningFields; private INodeDomain domain; - private IRangeMap rangeMap; + private RangeMap rangeMap; public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, - IRangeMap rangeMap) { + RangeMap rangeMap) { this.partitioningFields = partitioningFields; this.domain = domain; this.rangeMap = rangeMap; @@ -143,7 +143,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC); i++; } - ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap); + ITuplePartitionComputerFactory tpcf = new StaticFieldRangePartitionComputerFactory(sortFields, comps, rangeMap); IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf); return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java new file mode 100644 index 0000000..df0b446 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.api.dataflow.IConnectorDescriptor; +import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor; + +public class SequentialMergeExchangePOperator extends AbstractExchangePOperator { + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.SEQUENTIAL_MERGE_EXCHANGE; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + return emptyUnaryRequirements(); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties(); + List<ILocalStructuralProperty> localProperties; + if (childLocalProps != null) { + localProperties = new ArrayList<>(childLocalProps); + } else { + localProperties = new ArrayList<>(0); + } + + deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties); + } + + @Override + public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor( + IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) + throws AlgebricksException { + IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec); + return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java index 99ed738..77f052e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java @@ -40,6 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -465,6 +466,13 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr } @Override + public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException { + addIndent(indent) + .append("forward: range-map = " + op.getRangeMapExpression().getValue().accept(exprVisitor, indent)); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("sink"); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index f1f1f3b..4a17cc6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -647,6 +648,14 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat } @Override + public Void visitForwardOperator(ForwardOperator op, Integer indent) throws AlgebricksException { + addIndent(indent).append("\"operator\": \"forward\""); + addIndent(indent).append("\"expressions\": \"" + + op.getRangeMapExpression().getValue().accept(exprVisitor, indent).replace('"', ' ') + "\""); + return null; + } + + @Override public Void visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException { addIndent(indent).append("\"operator\": \"sink\""); return null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java index e7e98a5..b006a1e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalOrderProperty.java @@ -121,20 +121,7 @@ public final class LocalOrderProperty implements ILocalStructuralProperty { Iterator<OrderColumn> currentColumnIterator = orderColumns.iterator(); // Returns true if requiredColumnIterator is a prefix of currentColumnIterator. - return isPrefixOf(requiredColumnIterator, currentColumnIterator); - } - - private <T> boolean isPrefixOf(Iterator<T> requiredColumnIterator, Iterator<T> currentColumnIterator) { - while (requiredColumnIterator.hasNext()) { - T oc = requiredColumnIterator.next(); - if (!currentColumnIterator.hasNext()) { - return false; - } - if (!oc.equals(currentColumnIterator.next())) { - return false; - } - } - return true; + return PropertiesUtil.isPrefixOf(requiredColumnIterator, currentColumnIterator); } // Gets normalized ordering columns, where each column variable is a representative variable of its equivalence http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java index f2fed13..1c00e45 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java @@ -200,7 +200,7 @@ public class PropertiesUtil { * @param target * @return true iff pref is a prefix of target */ - private static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) { + public static <T> boolean isPrefixOf(Iterator<T> pref, Iterator<T> target) { while (pref.hasNext()) { T v = pref.next(); if (!target.hasNext()) { @@ -213,50 +213,65 @@ public class PropertiesUtil { return true; } + /** + * Normalizes or reduces the order columns argument based on the functional dependencies argument. The caller is + * responsible for taking caution as to how to handle the returned object since this method either returns the same + * object that is passed or returns a new object. + * @param orderColumns the order columns that are to be normalized + * @param functionalDependencies {@link FunctionalDependency} + * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned. + */ public static List<OrderColumn> applyFDsToOrderColumns(List<OrderColumn> orderColumns, - List<FunctionalDependency> fds) { - // the set of vars. is ordered - // so we try the variables in order from last to first - if (fds == null || fds.isEmpty()) { + List<FunctionalDependency> functionalDependencies) { + if (functionalDependencies == null || functionalDependencies.isEmpty()) { return orderColumns; } + // the set of vars. is ordered + // so we try the variables in order from last to first int deleted = 0; + boolean[] removedColumns = new boolean[orderColumns.size()]; for (int i = orderColumns.size() - 1; i >= 0; i--) { - for (FunctionalDependency fdep : fds) { - if (impliedByPrefix(orderColumns, i, fdep)) { - orderColumns.set(i, null); + for (FunctionalDependency functionalDependency : functionalDependencies) { + if (impliedByPrefix(orderColumns, i, functionalDependency)) { + removedColumns[i] = true; deleted++; break; } } } - List<OrderColumn> norm = new ArrayList<>(orderColumns.size() - deleted); - for (OrderColumn oc : orderColumns) { - if (oc != null) { - norm.add(oc); + List<OrderColumn> normalizedColumns = new ArrayList<>(orderColumns.size() - deleted); + for (int i = 0; i < orderColumns.size(); i++) { + if (!removedColumns[i]) { + normalizedColumns.add(orderColumns.get(i)); } } - return norm; + + return normalizedColumns; } + /** + * Normalizes or reduces the order columns argument based on the equivalenceClasses argument. The caller is + * responsible for taking caution as to how to handle the returned object since this method either returns the same + * object that is passed or returns a new object. + * @param orderColumns the order columns that are to be normalized + * @param equivalenceClasses {@link EquivalenceClass} + * @return a new normalized object if normalization is applied. Otherwise, the same argument object is returned. + */ public static List<OrderColumn> replaceOrderColumnsByEqClasses(List<OrderColumn> orderColumns, Map<LogicalVariable, EquivalenceClass> equivalenceClasses) { if (equivalenceClasses == null || equivalenceClasses.isEmpty()) { return orderColumns; } List<OrderColumn> norm = new ArrayList<>(); - for (OrderColumn v : orderColumns) { - EquivalenceClass ec = equivalenceClasses.get(v.getColumn()); - if (ec == null) { - norm.add(v); - } else { - if (ec.representativeIsConst()) { - // trivially satisfied, so the var. can be removed - } else { - norm.add(new OrderColumn(ec.getVariableRepresentative(), v.getOrder())); - } + for (OrderColumn orderColumn : orderColumns) { + EquivalenceClass columnEQClass = equivalenceClasses.get(orderColumn.getColumn()); + if (columnEQClass == null) { + norm.add(orderColumn); + } else if (!columnEQClass.representativeIsConst()) { + norm.add(new OrderColumn(columnEQClass.getVariableRepresentative(), orderColumn.getOrder())); } + // else columnEQClass rep. is constant, i.e. trivially satisfied, so the var. can be removed } return norm; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java index deb98b0..548a29f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java @@ -27,6 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -124,4 +125,5 @@ public interface ILogicalOperatorVisitor<R, T> { public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException; + public R visitForwardOperator(ForwardOperator op, T arg) throws AlgebricksException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java index 8779777..15bb54b 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java @@ -25,4 +25,6 @@ public class AlgebricksConfig { public static final String ALGEBRICKS_LOGGER_NAME = "org.apache.hyracks.algebricks"; public static final Logger ALGEBRICKS_LOGGER = LogManager.getLogger(ALGEBRICKS_LOGGER_NAME); + public static final int SORT_SAMPLES = 100; + public static final boolean SORT_PARALLEL = true; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java index 6fa378b..e3b1868 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPoli import org.apache.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy; import org.apache.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor; public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy { private static final long serialVersionUID = 1L; @@ -33,7 +34,8 @@ public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignme @Override public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) { - if (c instanceof MToNPartitioningMergingConnectorDescriptor) { + if (c instanceof MToNPartitioningMergingConnectorDescriptor + || c instanceof MToOneSequentialMergingConnectorDescriptor) { return senderSideMaterializePolicy; } else { return pipeliningPolicy; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java index 8eb9b90..a2a0ca1 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java @@ -37,29 +37,20 @@ import org.apache.logging.log4j.Level; public class HeuristicOptimizer { - public static PhysicalOperatorTag[] hyracksOperators = - new PhysicalOperatorTag[] { PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH, - PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, - PhysicalOperatorTag.HDFS_READER, PhysicalOperatorTag.HYBRID_HASH_JOIN, - PhysicalOperatorTag.IN_MEMORY_HASH_JOIN, PhysicalOperatorTag.NESTED_LOOP, - PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY, PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, - PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT, PhysicalOperatorTag.UNION_ALL }; - public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {}; - - public static boolean isHyracksOp(PhysicalOperatorTag opTag) { - for (PhysicalOperatorTag t : hyracksOperators) { - if (t == opTag) { - return true; - } - } - return false; - } - private final IOptimizationContext context; private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites; private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites; private final ILogicalPlan plan; + private static final PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] { + PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH, + PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER, + PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN, + PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY, + PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT, + PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD }; + public static final PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {}; + public HeuristicOptimizer(ILogicalPlan plan, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites, @@ -70,6 +61,15 @@ public class HeuristicOptimizer { this.physicalRewrites = physicalRewrites; } + public static boolean isHyracksOp(PhysicalOperatorTag opTag) { + for (PhysicalOperatorTag t : hyracksOperators) { + if (t == opTag) { + return true; + } + } + return false; + } + public void optimize() throws AlgebricksException { if (plan == null) { return; @@ -129,7 +129,6 @@ public class HeuristicOptimizer { if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) { AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting physical optimizations.\n"); } - // PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(plan); runOptimizationSets(plan, physicalRewrites); }
