This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new a555891 [NO ISSUE] Create an abstraction for the
ForwardOperatorDescriptor
a555891 is described below
commit a5558912dfab8ad7c43b31b7d2fc809ce7fcf6a8
Author: James Fang <[email protected]>
AuthorDate: Sat Apr 20 15:21:10 2019 -0700
[NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Abstract the activities out of the ForwardOperatorDescriptor so we can
reuse
the same basic framework for multiple forward Operators.
- Abstract the ForwardOperatorDescriptor out of the ForwardPOperator so we
can
reuse the same basic framework for multiple forward Operators.
Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3338
Contrib: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
.../rules/SweepIllegalNonfunctionalFunctions.java | 2 +-
.../algebra/operators/logical/ForwardOperator.java | 30 ++++-----
.../visitors/IsomorphismOperatorVisitor.java | 6 +-
...calOperatorDeepCopyWithNewVariablesVisitor.java | 4 +-
.../logical/visitors/OperatorDeepCopyVisitor.java | 2 +-
.../visitors/SubstituteVariableVisitor.java | 2 +-
.../logical/visitors/UsedVariableVisitor.java | 2 +-
...Operator.java => AbstractForwardPOperator.java} | 36 +++++++----
.../operators/physical/SortForwardPOperator.java | 34 ++++++++++
.../LogicalOperatorPrettyPrintVisitor.java | 2 +-
.../LogicalOperatorPrettyPrintVisitorJson.java | 2 +-
.../core/utils/LogicalOperatorDotVisitor.java | 2 +-
.../rules/EnforceStructuralPropertiesRule.java | 4 +-
.../rules/SetAlgebricksPhysicalOperatorsRule.java | 4 +-
.../base/AbstractForwardOperatorDescriptor.java | 73 ++++++++++++++++++++++
...tor.java => SortForwardOperatorDescriptor.java} | 44 ++++---------
16 files changed, 174 insertions(+), 75 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 86b2b88..2d5e11e 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -310,7 +310,7 @@ public class SweepIllegalNonfunctionalFunctions implements
IAlgebraicRewriteRule
@Override
public Void visitForwardOperator(ForwardOperator op, Void arg) throws
AlgebricksException {
- sweepExpression(op.getRangeMapExpression().getValue());
+ sweepExpression(op.getSideDataExpression().getValue());
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
index bc24b49..70e36e5 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ForwardOperator.java
@@ -33,30 +33,30 @@ import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionRef
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
/**
- * Forward operator is used to forward data to different NCs based on a range
map that is computed dynamically
- * by doing a pass over the data itself to infer the range map. The operator
takes two inputs:
+ * Forward operator is used to forward data to different NCs based on the side
data activity that is computed
+ * dynamically by doing a pass over the data itself to infer the range map.
The operator takes two inputs:
* 1. Tuples/data (at index 0). The data is forwarded to the range-based
connector which routes it to the target NC.
- * 2. Range map (at index 1). The range map will be stored in Hyracks context,
and the connector will pick it up.
- * Forward operator will receive the range map when it is broadcast by the
operator generating the range map after which
- * the forward operator will start forwarding the data.
+ * 2. Side Activity (at index 1). The output will be stored in Hyracks
context, and the connector will pick it up.
+ * Forward operator will receive the range map when it is broadcast by the
operator generating the side activity output
+ * after which the forward operator will start forwarding the data.
*/
public class ForwardOperator extends AbstractLogicalOperator {
- private final String rangeMapKey;
- private final Mutable<ILogicalExpression> rangeMapExpression;
+ private final String sideDataKey;
+ private final Mutable<ILogicalExpression> sideDataExpression;
- public ForwardOperator(String rangeMapKey, Mutable<ILogicalExpression>
rangeMapExpression) {
+ public ForwardOperator(String sideDataKey, Mutable<ILogicalExpression>
sideDataExpression) {
super();
- this.rangeMapKey = rangeMapKey;
- this.rangeMapExpression = rangeMapExpression;
+ this.sideDataKey = sideDataKey;
+ this.sideDataExpression = sideDataExpression;
}
- public String getRangeMapKey() {
- return rangeMapKey;
+ public String getSideDataKey() {
+ return sideDataKey;
}
- public Mutable<ILogicalExpression> getRangeMapExpression() {
- return rangeMapExpression;
+ public Mutable<ILogicalExpression> getSideDataExpression() {
+ return sideDataExpression;
}
@Override
@@ -72,7 +72,7 @@ public class ForwardOperator extends AbstractLogicalOperator {
@Override
public boolean
acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws
AlgebricksException {
- return visitor.transform(rangeMapExpression);
+ return visitor.transform(sideDataExpression);
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dd3053b..09358dd 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -605,9 +605,9 @@ public class IsomorphismOperatorVisitor implements
ILogicalOperatorVisitor<Boole
return Boolean.FALSE;
}
ForwardOperator otherOp = (ForwardOperator) copyAndSubstituteVar(op,
arg);
- ILogicalExpression rangeMapExp = op.getRangeMapExpression().getValue();
- ILogicalExpression otherRangeMapExp =
otherOp.getRangeMapExpression().getValue();
- return rangeMapExp.equals(otherRangeMapExp) &&
op.getRangeMapKey().equals(otherOp.getRangeMapKey());
+ ILogicalExpression rangeMapExp = op.getSideDataExpression().getValue();
+ ILogicalExpression otherRangeMapExp =
otherOp.getSideDataExpression().getValue();
+ return rangeMapExp.equals(otherRangeMapExp) &&
op.getSideDataKey().equals(otherOp.getSideDataKey());
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 99e852d..34b0ae6 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -590,8 +590,8 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
@Override
public ILogicalOperator visitForwardOperator(ForwardOperator op,
ILogicalOperator arg) throws AlgebricksException {
- ForwardOperator opCopy = new ForwardOperator(op.getRangeMapKey(),
-
exprDeepCopyVisitor.deepCopyExpressionReference(op.getRangeMapExpression()));
+ ForwardOperator opCopy = new ForwardOperator(op.getSideDataKey(),
+
exprDeepCopyVisitor.deepCopyExpressionReference(op.getSideDataExpression()));
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5be91cc..1727d10 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -345,7 +345,7 @@ public class OperatorDeepCopyVisitor implements
ILogicalOperatorVisitor<ILogical
@Override
public ILogicalOperator visitForwardOperator(ForwardOperator op, Void arg)
throws AlgebricksException {
- return new ForwardOperator(op.getRangeMapKey(),
deepCopyExpressionRef(op.getRangeMapExpression()));
+ return new ForwardOperator(op.getSideDataKey(),
deepCopyExpressionRef(op.getSideDataExpression()));
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 550a208..028bf9f 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -454,7 +454,7 @@ public class SubstituteVariableVisitor
@Override
public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable,
LogicalVariable> arg)
throws AlgebricksException {
- op.getRangeMapExpression().getValue().substituteVar(arg.first,
arg.second);
+ op.getSideDataExpression().getValue().substituteVar(arg.first,
arg.second);
substVarTypes(op, arg);
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 39b9689..845a853 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -428,7 +428,7 @@ public class UsedVariableVisitor implements
ILogicalOperatorVisitor<Void, Void>
@Override
public Void visitForwardOperator(ForwardOperator op, Void arg) throws
AlgebricksException {
- op.getRangeMapExpression().getValue().getUsedVariables(usedVariables);
+ op.getSideDataExpression().getValue().getUsedVariables(usedVariables);
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
similarity index 83%
rename from
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
rename to
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
index 11c584e..778af18 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractForwardPOperator.java
@@ -39,17 +39,17 @@ import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
/**
* <pre>
- * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
- * idx0: Input data source --
- * |-- forward op.
- * idx1: RangeMap generator--
+ * {@see {@link ForwardOperator} and {@link AbstractForwardOperatorDescriptor}}
+ * idx0: Input data source --
+ * |-- forward op.
+ * idx1: Side activity output --
* </pre>
*/
-public class ForwardPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractForwardPOperator extends
AbstractPhysicalOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -57,8 +57,18 @@ public class ForwardPOperator extends
AbstractPhysicalOperator {
}
/**
- * Forward operator requires that the global aggregate operator broadcasts
the range map. No required properties at
- * the data source input.
+ * Get the correct Forward Operator Descriptor
+ * @param builder Hyracks job builder
+ * @param forwardOp Forward Operator
+ * @param dataInputDescriptor Data input descriptor
+ * @return return the correct operator descriptor
+ */
+ public abstract AbstractForwardOperatorDescriptor
getOperatorDescriptor(IHyracksJobBuilder builder,
+ ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor);
+
+ /**
+ * Forward operator requires that the global aggregate operator broadcasts
side activity output.
+ * No required properties at the data source input.
* @param op {@see {@link ForwardOperator}}
* @param requiredByParent parent's requirements, which are not enforced
for now, as we only explore one plan
* @param context the optimization context
@@ -67,7 +77,7 @@ public class ForwardPOperator extends
AbstractPhysicalOperator {
@Override
public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector requiredByParent, IOptimizationContext
context) {
- // broadcast the range map to the cluster node domain
+ // broadcast the side activity output to the cluster node domain
INodeDomain targetDomain = context.getComputationNodeDomain();
List<ILocalStructuralProperty> noProp = new ArrayList<>();
StructuralPropertiesVector[] requiredAtInputs = new
StructuralPropertiesVector[2];
@@ -108,13 +118,13 @@ public class ForwardPOperator extends
AbstractPhysicalOperator {
ForwardOperator forwardOp = (ForwardOperator) op;
RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()),
inputSchemas[0], context);
- ForwardOperatorDescriptor forwardDescriptor =
- new ForwardOperatorDescriptor(builder.getJobSpec(),
forwardOp.getRangeMapKey(), dataInputDescriptor);
+ AbstractForwardOperatorDescriptor forwardDescriptor =
+ getOperatorDescriptor(builder, forwardOp, dataInputDescriptor);
builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
- ILogicalOperator rangemapSource =
forwardOp.getInputs().get(1).getValue();
- builder.contributeGraphEdge(rangemapSource, 0, forwardOp, 1);
+ ILogicalOperator sideDataSource =
forwardOp.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(sideDataSource, 0, forwardOp, 1);
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
new file mode 100644
index 0000000..0903b5e
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortForwardPOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor;
+
+public class SortForwardPOperator extends AbstractForwardPOperator {
+
+ @Override
+ public AbstractForwardOperatorDescriptor
getOperatorDescriptor(IHyracksJobBuilder builder,
+ ForwardOperator forwardOp, RecordDescriptor dataInputDescriptor) {
+ return new SortForwardOperatorDescriptor(builder.getJobSpec(),
forwardOp.getSideDataKey(), dataInputDescriptor);
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 27d4ced..4128c8b 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -467,7 +467,7 @@ public class LogicalOperatorPrettyPrintVisitor extends
AbstractLogicalOperatorPr
@Override
public Void visitForwardOperator(ForwardOperator op, Integer indent)
throws AlgebricksException {
addIndent(indent)
- .append("forward: range-map = " +
op.getRangeMapExpression().getValue().accept(exprVisitor, indent));
+ .append("forward: range-map = " +
op.getSideDataExpression().getValue().accept(exprVisitor, indent));
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index f5ff12f..e56a8bd 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -639,7 +639,7 @@ public class LogicalOperatorPrettyPrintVisitorJson extends
AbstractLogicalOperat
addIndent(indent).append("\"operator\": \"forward\"");
addIndent(0).append(",\n");
addIndent(indent).append("\"expressions\": \""
- + op.getRangeMapExpression().getValue().accept(exprVisitor,
indent).replace('"', ' ') + "\"");
+ + op.getSideDataExpression().getValue().accept(exprVisitor,
indent).replace('"', ' ') + "\"");
return null;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 1c761fa..795fb10 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -590,7 +590,7 @@ public class LogicalOperatorDotVisitor implements
ILogicalOperatorVisitor<String
@Override
public String visitForwardOperator(ForwardOperator op, Boolean
showDetails) throws AlgebricksException {
stringBuilder.setLength(0);
-
stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")");
+
stringBuilder.append("forward(").append(op.getSideDataExpression().getValue().toString()).append(")");
appendSchema(op, showDetails);
appendAnnotations(op, showDetails);
appendPhysicalOperatorInfo(op, showDetails);
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f66a6b8..7dc596c 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -65,7 +65,6 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractSta
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -76,6 +75,7 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartit
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
@@ -880,7 +880,7 @@ public class EnforceStructuralPropertiesRule implements
IAlgebraicRewriteRule {
AbstractLogicalExpression rangeMapExpression = new
VariableReferenceExpression(rangeMapVariable, sourceLoc);
ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new
MutableObject<>(rangeMapExpression));
forwardOperator.setSourceLocation(sourceLoc);
- forwardOperator.setPhysicalOperator(new ForwardPOperator());
+ forwardOperator.setPhysicalOperator(new SortForwardPOperator());
forwardOperator.getInputs().add(exchangeOpFromReplicate);
forwardOperator.getInputs().add(globalAggInput);
OperatorManipulationUtil.setOperatorMode(forwardOperator);
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 612f79e..a7bf11e 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -61,7 +61,6 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceS
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
@@ -78,6 +77,7 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePO
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SortForwardPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SplitPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
@@ -398,7 +398,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements
IAlgebraicRewriteRule
break;
}
case FORWARD:
- op.setPhysicalOperator(new ForwardPOperator());
+ op.setPhysicalOperator(new SortForwardPOperator());
break;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
new file mode 100644
index 0000000..b57ad16
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+// TODO(ali): forward operator should probably be moved to asterix layer
+public abstract class AbstractForwardOperatorDescriptor extends
AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ protected static final int FORWARD_DATA_ACTIVITY_ID = 0;
+ protected static final int SIDE_DATA_ACTIVITY_ID = 1;
+ protected String sideDataKey;
+
+ /**
+ * @param spec used to create the operator id.
+ * @param sideDataKey the key used to store the output of the side activity
+ * @param outputRecordDescriptor the output schema of this operator.
+ */
+ public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec,
String sideDataKey,
+ RecordDescriptor outputRecordDescriptor) {
+ super(spec, 2, 1);
+ outRecDescs[0] = outputRecordDescriptor;
+ this.sideDataKey = sideDataKey;
+ }
+
+ /**
+ * @return the forward data activity
+ */
+ public abstract AbstractActivityNode createForwardDataActivity();
+
+ /**
+ * @return the side data activity
+ */
+ public abstract AbstractActivityNode createSideDataActivity();
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ AbstractActivityNode forwardDataActivity = createForwardDataActivity();
+ AbstractActivityNode sideDataActivity = createSideDataActivity();
+
+ // side data activity, its input is coming through the operator's
in-port = 1 & activity's in-port = 0
+ builder.addActivity(this, sideDataActivity);
+ builder.addSourceEdge(1, sideDataActivity, 0);
+
+ // forward data activity, its input is coming through the operator's
in-port = 0 & activity's in-port = 0
+ builder.addActivity(this, forwardDataActivity);
+ builder.addSourceEdge(0, forwardDataActivity, 0);
+
+ // forward data activity will wait for the side data activity
+ builder.addBlockingEdge(sideDataActivity, forwardDataActivity);
+
+ // data leaves from the operator's out-port = 0 & forward data
activity's out-port = 0
+ builder.addTargetEdge(0, forwardDataActivity, 0);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
similarity index 83%
rename from
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
rename to
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 49eea0a..1daf9fb 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -42,50 +41,33 @@ import
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDese
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
// TODO(ali): forward operator should probably be moved to asterix layer
-public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+public class SortForwardOperatorDescriptor extends
AbstractForwardOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final int FORWARD_DATA_ACTIVITY_ID = 0;
- private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
- private final String rangeMapKeyInContext;
/**
* @param spec used to create the operator id.
- * @param rangeMapKeyInContext the unique key to store the range map in
the shared map & transfer it to partitioner.
+ * @param sideDataKey the unique key to store the range map in the shared
map & transfer it to partitioner.
* @param outputRecordDescriptor the output schema of this operator.
*/
- public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String
rangeMapKeyInContext,
+ public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec,
String sideDataKey,
RecordDescriptor outputRecordDescriptor) {
- super(spec, 2, 1);
- this.rangeMapKeyInContext = rangeMapKeyInContext;
- outRecDescs[0] = outputRecordDescriptor;
+ super(spec, sideDataKey, outputRecordDescriptor);
}
@Override
- public void contributeActivities(IActivityGraphBuilder builder) {
- ForwardDataActivity forwardDataActivity =
- new ForwardDataActivity(new ActivityId(odId,
FORWARD_DATA_ACTIVITY_ID));
- RangeMapReaderActivity rangeMapReaderActivity =
- new RangeMapReaderActivity(new ActivityId(odId,
RANGEMAP_READER_ACTIVITY_ID));
-
- // range map reader activity, its input is coming through the
operator's in-port = 1 & activity's in-port = 0
- builder.addActivity(this, rangeMapReaderActivity);
- builder.addSourceEdge(1, rangeMapReaderActivity, 0);
-
- // forward data activity, its input is coming through the operator's
in-port = 0 & activity's in-port = 0
- builder.addActivity(this, forwardDataActivity);
- builder.addSourceEdge(0, forwardDataActivity, 0);
-
- // forward data activity will wait for the range map reader activity
- builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
+ public AbstractActivityNode createForwardDataActivity() {
+ return new ForwardDataActivity(new ActivityId(odId,
FORWARD_DATA_ACTIVITY_ID));
+ }
- // data leaves from the operator's out-port = 0 & forward data
activity's out-port = 0
- builder.addTargetEdge(0, forwardDataActivity, 0);
+ @Override
+ public AbstractActivityNode createSideDataActivity() {
+ return new RangeMapReaderActivity(new ActivityId(odId,
SIDE_DATA_ACTIVITY_ID));
}
/**
@@ -221,9 +203,9 @@ public class ForwardOperatorDescriptor extends
AbstractOperatorDescriptor {
public void open() throws HyracksDataException {
// retrieve the range map from the state object (previous activity
should have already stored it)
// then deposit it into the ctx so that MToN-partition can pick it
up
- Object stateObjKey = new TaskId(new ActivityId(odId,
RANGEMAP_READER_ACTIVITY_ID), partition);
+ Object stateObjKey = new TaskId(new ActivityId(odId,
SIDE_DATA_ACTIVITY_ID), partition);
RangeMapState rangeMapState = (RangeMapState)
ctx.getStateObject(stateObjKey);
- TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
+ TaskUtil.put(sideDataKey, rangeMapState.rangeMap, ctx);
writer.open();
}