This is an automated email from the ASF dual-hosted git repository.
dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new e95dba6 [NO ISSUE][COMP] Refactor physical operator assignment rules
e95dba6 is described below
commit e95dba63ce0f838a73b4908407b453bb3348c26b
Author: Dmitry Lychagin <[email protected]>
AuthorDate: Mon Apr 29 15:16:48 2019 -0700
[NO ISSUE][COMP] Refactor physical operator assignment rules
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Refactor SetAlgebricksPhysicalOperatorsRule and make it extensible
- Make SetAsterixPhysicalOperatorsRule a subclass of
SetAlgebricksPhysicalOperatorsRule
- Remove SetAlgebricksPhysicalOperatorsRule from Asterix rule set,
replace its invocations with SetAsterixPhysicalOperatorsRule
Change-Id: I502f367464a6fabc595cff804722f793e052570f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3367
Contrib: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../asterix/optimizer/base/RuleCollections.java | 8 +-
.../rules/SetAsterixPhysicalOperatorsRule.java | 474 +++++-------
.../algebra/operators/logical/WindowOperator.java | 27 +
.../rules/SetAlgebricksPhysicalOperatorsRule.java | 807 ++++++++++++---------
.../apache/hyracks/api/exceptions/ErrorCode.java | 3 +-
.../src/main/resources/errormsg/en.properties | 3 +-
6 files changed, 671 insertions(+), 651 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 8677d0f..6a11abf 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -132,7 +132,6 @@ import
org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantWindowOperato
import
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
import
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
-import
org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
@@ -359,7 +358,6 @@ public final class RuleCollections {
physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
//Turned off the following rule for now not to change OptimizerTest
results.
physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
- physicalRewritesAllLevels.add(new
SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new
AddEquivalenceClassForRecordConstructorRule());
physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
@@ -373,7 +371,7 @@ public final class RuleCollections {
physicalRewritesAllLevels.add(new
RemoveUnusedAssignAndAggregateRule());
physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
// After adding projects, we may need need to set physical operators
again.
- physicalRewritesAllLevels.add(new
SetAlgebricksPhysicalOperatorsRule());
+ physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
return physicalRewritesAllLevels;
}
@@ -390,7 +388,7 @@ public final class RuleCollections {
// remove assigns that could become unused after
PushLimitIntoPrimarySearchRule
physicalRewritesTopLevel.add(new RemoveUnusedAssignAndAggregateRule());
physicalRewritesTopLevel.add(new IntroduceProjectsRule());
- physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
+ physicalRewritesTopLevel.add(new SetAsterixPhysicalOperatorsRule());
physicalRewritesTopLevel.add(new
IntroduceRapidFrameFlushProjectAssignRule());
physicalRewritesTopLevel.add(new SetExecutionModeRule());
physicalRewritesTopLevel.add(new
IntroduceRandomPartitioningFeedComputationRule());
@@ -400,7 +398,7 @@ public final class RuleCollections {
public static final List<IAlgebraicRewriteRule>
prepareForJobGenRuleCollection() {
List<IAlgebraicRewriteRule> prepareForJobGenRewrites = new
LinkedList<>();
prepareForJobGenRewrites.add(new InsertProjectBeforeUnionRule());
- prepareForJobGenRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
+ prepareForJobGenRewrites.add(new SetAsterixPhysicalOperatorsRule());
prepareForJobGenRewrites
.add(new
IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 4314b3a..b26eaca 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -37,349 +37,223 @@ import
org.apache.asterix.optimizer.rules.am.BTreeJobGenParams;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import
org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
-public class SetAsterixPhysicalOperatorsRule implements IAlgebraicRewriteRule {
+public final class SetAsterixPhysicalOperatorsRule extends
SetAlgebricksPhysicalOperatorsRule {
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
- throws AlgebricksException {
- return false;
+ protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean>
createPhysicalOperatorFactoryVisitor(
+ IOptimizationContext context) {
+ return new AsterixPhysicalOperatorFactoryVisitor(context);
}
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
- throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator)
opRef.getValue();
- if (context.checkIfInDontApplySet(this, op)) {
- return false;
+ private static class AsterixPhysicalOperatorFactoryVisitor extends
AlgebricksPhysicalOperatorFactoryVisitor {
+
+ private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext
context) {
+ super(context);
}
- computeDefaultPhysicalOp(op, true, context);
- context.addToDontApplySet(this, op);
- return true;
- }
+ @Override
+ public ExternalGroupByPOperator
createExternalGroupByPOperator(GroupByOperator gby) throws AlgebricksException {
+ Mutable<ILogicalOperator> r0 =
gby.getNestedPlans().get(0).getRoots().get(0);
+ if
(!r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
+ return null;
+ }
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ boolean serializable = aggOp.getExpressions().stream()
+ .allMatch(exprRef -> exprRef.getValue().getExpressionTag()
== LogicalExpressionTag.FUNCTION_CALL
+ &&
BuiltinFunctions.isAggregateFunctionSerializable(
+ ((AbstractFunctionCallExpression)
exprRef.getValue()).getFunctionIdentifier()));
+ if (!serializable) {
+ return null;
+ }
- private static void setPhysicalOperators(ILogicalPlan plan, boolean
topLevelOp, IOptimizationContext context)
- throws AlgebricksException {
- for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
root.getValue(), topLevelOp, context);
- }
- }
+ // if serializable, use external group-by
+ // now check whether the serialized version aggregation function
has corresponding intermediate agg
+ IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ List<LogicalVariable> originalVariables = aggOp.getVariables();
+ List<Mutable<ILogicalExpression>> aggExprs =
aggOp.getExpressions();
+ int aggNum = aggExprs.size();
+ for (int i = 0; i < aggNum; i++) {
+ AbstractFunctionCallExpression expr =
(AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+ AggregateFunctionCallExpression serialAggExpr =
BuiltinFunctions
+
.makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
expr.getArguments());
+ serialAggExpr.setSourceLocation(expr.getSourceLocation());
+ if
(mergeAggregationExpressionFactory.createMergeAggregation(originalVariables.get(i),
serialAggExpr,
+ context) == null) {
+ return null;
+ }
+ }
- private static void computeDefaultPhysicalOp(AbstractLogicalOperator op,
boolean topLevelOp,
- IOptimizationContext context) throws AlgebricksException {
- PhysicalOptimizationConfig physicalOptimizationConfig =
context.getPhysicalOptimizationConfig();
- if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
- GroupByOperator gby = (GroupByOperator) op;
- if (gby.getNestedPlans().size() == 1) {
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() == 1) {
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- if
(r0.getValue().getOperatorTag().equals(LogicalOperatorTag.AGGREGATE)) {
- AggregateOperator aggOp = (AggregateOperator)
r0.getValue();
- boolean serializable = true;
- for (Mutable<ILogicalExpression> exprRef :
aggOp.getExpressions()) {
- AbstractFunctionCallExpression expr =
(AbstractFunctionCallExpression) exprRef.getValue();
- if
(!BuiltinFunctions.isAggregateFunctionSerializable(expr.getFunctionIdentifier()))
{
- serializable = false;
- break;
- }
- }
+ // Check whether there are multiple aggregates in the sub plan.
+ // Currently, we don't support multiple aggregates in one external
group-by.
+ ILogicalOperator r1Logical = aggOp;
+ while (r1Logical.hasInputs()) {
+ r1Logical = r1Logical.getInputs().get(0).getValue();
+ if (r1Logical.getOperatorTag() ==
LogicalOperatorTag.AGGREGATE) {
+ return null;
+ }
+ }
- if
((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) ==
Boolean.TRUE || gby
-
.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) ==
Boolean.TRUE)) {
- boolean setToExternalGby = false;
- if (serializable) {
- // if serializable, use external group-by
- // now check whether the serialized version
aggregation function has corresponding intermediate agg
- boolean hasIntermediateAgg = true;
- IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory =
-
context.getMergeAggregationExpressionFactory();
- List<LogicalVariable> originalVariables =
aggOp.getVariables();
- List<Mutable<ILogicalExpression>> aggExprs =
aggOp.getExpressions();
- int aggNum = aggExprs.size();
- for (int i = 0; i < aggNum; i++) {
- AbstractFunctionCallExpression expr =
- (AbstractFunctionCallExpression)
aggExprs.get(i).getValue();
- AggregateFunctionCallExpression
serialAggExpr =
-
BuiltinFunctions.makeSerializableAggregateFunctionExpression(
-
expr.getFunctionIdentifier(), expr.getArguments());
-
serialAggExpr.setSourceLocation(expr.getSourceLocation());
- if
(mergeAggregationExpressionFactory.createMergeAggregation(
- originalVariables.get(i),
serialAggExpr, context) == null) {
- hasIntermediateAgg = false;
- break;
- }
- }
+ for (int i = 0; i < aggNum; i++) {
+ AbstractFunctionCallExpression expr =
(AbstractFunctionCallExpression) aggExprs.get(i).getValue();
+ AggregateFunctionCallExpression serialAggExpr =
BuiltinFunctions
+
.makeSerializableAggregateFunctionExpression(expr.getFunctionIdentifier(),
expr.getArguments());
+ serialAggExpr.setSourceLocation(expr.getSourceLocation());
+ aggOp.getExpressions().get(i).setValue(serialAggExpr);
+ }
- // Check whether there are multiple aggregates
in the sub plan.
- // Currently, we don't support multiple
aggregates in one external group-by.
- boolean multipleAggOpsFound = false;
- ILogicalOperator r1Logical = aggOp;
- while (r1Logical.hasInputs()) {
- r1Logical =
r1Logical.getInputs().get(0).getValue();
- if (r1Logical.getOperatorTag() ==
LogicalOperatorTag.AGGREGATE) {
- multipleAggOpsFound = true;
- break;
- }
- }
+ generateMergeAggregationExpressions(gby);
- if (hasIntermediateAgg &&
!multipleAggOpsFound) {
- for (int i = 0; i < aggNum; i++) {
- AbstractFunctionCallExpression expr =
-
(AbstractFunctionCallExpression) aggExprs.get(i).getValue();
- AggregateFunctionCallExpression
serialAggExpr =
-
BuiltinFunctions.makeSerializableAggregateFunctionExpression(
-
expr.getFunctionIdentifier(), expr.getArguments());
-
serialAggExpr.setSourceLocation(expr.getSourceLocation());
-
aggOp.getExpressions().get(i).setValue(serialAggExpr);
- }
- ExternalGroupByPOperator externalGby =
- new
ExternalGroupByPOperator(gby.getGroupByVarList(),
-
physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long)
physicalOptimizationConfig.getMaxFramesForGroupBy()
- *
physicalOptimizationConfig.getFrameSize());
- generateMergeAggregationExpressions(gby,
context);
- op.setPhysicalOperator(externalGby);
- setToExternalGby = true;
- }
- }
+ return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
+ }
- if (!setToExternalGby) {
- // if not serializable or no intermediate agg,
use pre-clustered group-by
- List<Pair<LogicalVariable,
Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new
ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable,
Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr =
p.second.getValue();
- if (expr.getExpressionTag() ==
LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef =
(VariableReferenceExpression) expr;
-
columnList.add(varRef.getVariableReference());
- }
- }
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
-
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- }
- }
- } else if
(r0.getValue().getOperatorTag().equals(LogicalOperatorTag.RUNNINGAGGREGATE)) {
- List<Pair<LogicalVariable,
Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
- List<LogicalVariable> columnList = new
ArrayList<LogicalVariable>(gbyList.size());
- for (Pair<LogicalVariable,
Mutable<ILogicalExpression>> p : gbyList) {
- ILogicalExpression expr = p.second.getValue();
- if (expr.getExpressionTag() ==
LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef =
(VariableReferenceExpression) expr;
- columnList.add(varRef.getVariableReference());
- }
- }
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
-
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- } else {
- throw new
CompilationException(ErrorCode.COMPILATION_ERROR, gby.getSourceLocation(),
- "Unsupported nested operator within a
group-by: "
- +
r0.getValue().getOperatorTag().name());
- }
- }
+ private void generateMergeAggregationExpressions(GroupByOperator gby)
throws AlgebricksException {
+ if (gby.getNestedPlans().size() != 1) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
+ "External group-by currently works only for one nested
plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
}
- }
- if (op.getPhysicalOperator() == null) {
- switch (op.getOperatorTag()) {
- case INNERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp,
context);
- break;
- }
- case LEFTOUTERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op,
topLevelOp, context);
- break;
- }
- case UNNEST_MAP:
- case LEFT_OUTER_UNNEST_MAP: {
- ILogicalExpression unnestExpr = null;
- unnestExpr = ((AbstractUnnestMapOperator)
op).getExpressionRef().getValue();
- if (unnestExpr.getExpressionTag() ==
LogicalExpressionTag.FUNCTION_CALL) {
- AbstractFunctionCallExpression f =
(AbstractFunctionCallExpression) unnestExpr;
- FunctionIdentifier fid = f.getFunctionIdentifier();
- if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) {
- throw new IllegalStateException();
- }
- AccessMethodJobGenParams jobGenParams = new
AccessMethodJobGenParams();
- jobGenParams.readFromFuncArgs(f.getArguments());
- MetadataProvider mp = (MetadataProvider)
context.getMetadataProvider();
- DataSourceId dataSourceId =
- new
DataSourceId(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
- Dataset dataset =
-
mp.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
- IDataSourceIndex<String, DataSourceId> dsi =
-
mp.findDataSourceIndex(jobGenParams.getIndexName(), dataSourceId);
- INodeDomain storageDomain =
mp.findNodeDomain(dataset.getNodeGroupName());
- if (dsi == null) {
- throw new
CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
- "Could not find index " +
jobGenParams.getIndexName() + " for dataset "
- + dataSourceId);
- }
- IndexType indexType = jobGenParams.getIndexType();
- boolean requiresBroadcast =
jobGenParams.getRequiresBroadcast();
- switch (indexType) {
- case BTREE: {
- BTreeJobGenParams btreeJobGenParams = new
BTreeJobGenParams();
-
btreeJobGenParams.readFromFuncArgs(f.getArguments());
- op.setPhysicalOperator(new
BTreeSearchPOperator(dsi, storageDomain, requiresBroadcast,
- btreeJobGenParams.isPrimaryIndex(),
btreeJobGenParams.isEqCondition(),
- btreeJobGenParams.getLowKeyVarList(),
btreeJobGenParams.getHighKeyVarList()));
- break;
- }
- case RTREE: {
- op.setPhysicalOperator(new
RTreeSearchPOperator(dsi, storageDomain, requiresBroadcast));
- break;
- }
- case SINGLE_PARTITION_WORD_INVIX:
- case SINGLE_PARTITION_NGRAM_INVIX: {
- op.setPhysicalOperator(
- new InvertedIndexPOperator(dsi,
storageDomain, requiresBroadcast, false));
- break;
- }
- case LENGTH_PARTITIONED_WORD_INVIX:
- case LENGTH_PARTITIONED_NGRAM_INVIX: {
- op.setPhysicalOperator(
- new InvertedIndexPOperator(dsi,
storageDomain, requiresBroadcast, true));
- break;
- }
- default: {
- throw new NotImplementedException(indexType +
" indexes are not implemented.");
- }
- }
- }
- break;
- }
- case WINDOW: {
- WindowOperator winOp = (WindowOperator) op;
- WindowPOperator physOp = createWindowPOperator(winOp,
context);
- op.setPhysicalOperator(physOp);
- break;
- }
+ ILogicalPlan p0 = gby.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
+ "External group-by currently works only for one nested
plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
}
- }
- if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans nested =
(AbstractOperatorWithNestedPlans) op;
- for (ILogicalPlan p : nested.getNestedPlans()) {
- setPhysicalOperators(p, false, context);
+ IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+ AbstractLogicalOperator r0Logical = (AbstractLogicalOperator)
r0.getValue();
+ if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
+ "The merge aggregation expression generation should
not process a " + r0Logical.getOperatorTag()
+ + " operator.");
}
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ List<Mutable<ILogicalExpression>> aggFuncRefs =
aggOp.getExpressions();
+ List<LogicalVariable> aggProducedVars = aggOp.getVariables();
+ int n = aggOp.getExpressions().size();
+ List<Mutable<ILogicalExpression>> mergeExpressionRefs = new
ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
+ ILogicalExpression mergeExpr =
mergeAggregationExpressionFactory
+ .createMergeAggregation(aggProducedVars.get(i),
aggFuncExpr, context);
+ if (mergeExpr == null) {
+ throw new
CompilationException(ErrorCode.COMPILATION_ERROR,
aggFuncExpr.getSourceLocation(),
+ "The aggregation function "
+ + ((AbstractFunctionCallExpression)
aggFuncExpr).getFunctionIdentifier().getName()
+ + " does not have a registered
intermediate aggregation function.");
+ }
+ mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+ }
+ aggOp.setMergeExpressions(mergeExpressionRefs);
}
- for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
opRef.getValue(), topLevelOp, context);
- }
- }
- private static void generateMergeAggregationExpressions(GroupByOperator
gby, IOptimizationContext context)
- throws AlgebricksException {
- if (gby.getNestedPlans().size() != 1) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
- "External group-by currently works only for one nested
plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() != 1) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
- "External group-by currently works only for one nested
plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
+ @Override
+ public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op);
}
- IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
- context.getMergeAggregationExpressionFactory();
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- AbstractLogicalOperator r0Logical = (AbstractLogicalOperator)
r0.getValue();
- if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR,
gby.getSourceLocation(),
- "The merge aggregation expression generation should not
process a " + r0Logical.getOperatorTag()
- + " operator.");
- }
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
- List<LogicalVariable> aggProducedVars = aggOp.getVariables();
- int n = aggOp.getExpressions().size();
- List<Mutable<ILogicalExpression>> mergeExpressionRefs = new
ArrayList<Mutable<ILogicalExpression>>();
- for (int i = 0; i < n; i++) {
- ILogicalExpression aggFuncExpr = aggFuncRefs.get(i).getValue();
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
- .createMergeAggregation(aggProducedVars.get(i),
aggFuncExpr, context);
- if (mergeExpr == null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR,
aggFuncExpr.getSourceLocation(),
- "The aggregation function "
- + ((AbstractFunctionCallExpression)
aggFuncExpr).getFunctionIdentifier().getName()
- + " does not have a registered intermediate
aggregation function.");
- }
- mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+
+ @Override
+ public IPhysicalOperator
visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean
topLevelOp)
+ throws AlgebricksException {
+ return visitAbstractUnnestMapOperator(op);
}
- aggOp.setMergeExpressions(mergeExpressionRefs);
- }
- private static WindowPOperator createWindowPOperator(WindowOperator winOp,
IOptimizationContext context)
- throws CompilationException {
- List<Mutable<ILogicalExpression>> partitionExprs =
winOp.getPartitionExpressions();
- List<LogicalVariable> partitionColumns = new
ArrayList<>(partitionExprs.size());
- for (Mutable<ILogicalExpression> pe : partitionExprs) {
- ILogicalExpression partExpr = pe.getValue();
- if (partExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
- throw new
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
winOp.getSourceLocation(),
- "Window partition/order expression has not been
normalized");
+ private IPhysicalOperator
visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op)
+ throws AlgebricksException {
+ ILogicalExpression unnestExpr = op.getExpressionRef().getValue();
+ if (unnestExpr.getExpressionTag() !=
LogicalExpressionTag.FUNCTION_CALL) {
+ throw new
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
op.getSourceLocation());
}
- LogicalVariable var = ((VariableReferenceExpression)
partExpr).getVariableReference();
- partitionColumns.add(var);
- }
- List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>>
orderExprs = winOp.getOrderExpressions();
- List<OrderColumn> orderColumns = new ArrayList<>(orderExprs.size());
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
orderExprs) {
- ILogicalExpression orderExpr = p.second.getValue();
- if (orderExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE)
{
- throw new
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
winOp.getSourceLocation(),
- "Window partition/order expression has not been
normalized");
+ AbstractFunctionCallExpression f =
(AbstractFunctionCallExpression) unnestExpr;
+ if
(!f.getFunctionIdentifier().equals(BuiltinFunctions.INDEX_SEARCH)) {
+ throw new
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
op.getSourceLocation());
+ }
+ AccessMethodJobGenParams jobGenParams = new
AccessMethodJobGenParams();
+ jobGenParams.readFromFuncArgs(f.getArguments());
+ MetadataProvider mp = (MetadataProvider)
context.getMetadataProvider();
+ DataSourceId dataSourceId =
+ new DataSourceId(jobGenParams.getDataverseName(),
jobGenParams.getDatasetName());
+ Dataset dataset = mp.findDataset(jobGenParams.getDataverseName(),
jobGenParams.getDatasetName());
+ IDataSourceIndex<String, DataSourceId> dsi =
+ mp.findDataSourceIndex(jobGenParams.getIndexName(),
dataSourceId);
+ INodeDomain storageDomain =
mp.findNodeDomain(dataset.getNodeGroupName());
+ if (dsi == null) {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR,
op.getSourceLocation(),
+ "Could not find index " + jobGenParams.getIndexName()
+ " for dataset " + dataSourceId);
+ }
+ IndexType indexType = jobGenParams.getIndexType();
+ boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
+ switch (indexType) {
+ case BTREE: {
+ BTreeJobGenParams btreeJobGenParams = new
BTreeJobGenParams();
+ btreeJobGenParams.readFromFuncArgs(f.getArguments());
+ return new BTreeSearchPOperator(dsi, storageDomain,
requiresBroadcast,
+ btreeJobGenParams.isPrimaryIndex(),
btreeJobGenParams.isEqCondition(),
+ btreeJobGenParams.getLowKeyVarList(),
btreeJobGenParams.getHighKeyVarList());
+ }
+ case RTREE: {
+ return new RTreeSearchPOperator(dsi, storageDomain,
requiresBroadcast);
+ }
+ case SINGLE_PARTITION_WORD_INVIX:
+ case SINGLE_PARTITION_NGRAM_INVIX: {
+ return new InvertedIndexPOperator(dsi, storageDomain,
requiresBroadcast, false);
+ }
+ case LENGTH_PARTITIONED_WORD_INVIX:
+ case LENGTH_PARTITIONED_NGRAM_INVIX: {
+ return new InvertedIndexPOperator(dsi, storageDomain,
requiresBroadcast, true);
+ }
+ default: {
+ throw AlgebricksException.create(
+
org.apache.hyracks.api.exceptions.ErrorCode.OPERATOR_NOT_IMPLEMENTED,
+ op.getSourceLocation(),
op.getOperatorTag().toString() + " with " + indexType + " index");
+ }
}
- LogicalVariable var = ((VariableReferenceExpression)
orderExpr).getVariableReference();
- orderColumns.add(new OrderColumn(var, p.first.getKind()));
}
- boolean partitionMaterialization = winOp.hasNestedPlans() ||
AnalysisUtil.hasFunctionWithProperty(winOp,
- BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
- boolean frameStartIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
- winOp.getFrameValueExpressions());
- boolean frameEndIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
- winOp.getFrameValueExpressions());
- boolean nestedTrivialAggregates = winOp.hasNestedPlans()
- &&
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
+ @Override
+ public WindowPOperator createWindowPOperator(WindowOperator winOp)
throws AlgebricksException {
+ boolean partitionMaterialization = winOp.hasNestedPlans() ||
AnalysisUtil.hasFunctionWithProperty(winOp,
+
BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION);
+ boolean frameStartIsMonotonic = AnalysisUtil
+
.isWindowFrameBoundaryMonotonic(winOp.getFrameStartExpressions(),
winOp.getFrameValueExpressions());
+ boolean frameEndIsMonotonic =
AnalysisUtil.isWindowFrameBoundaryMonotonic(winOp.getFrameEndExpressions(),
+ winOp.getFrameValueExpressions());
+ boolean nestedTrivialAggregates = winOp.hasNestedPlans()
+ &&
winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
- int memSizeInFrames =
context.getPhysicalOptimizationConfig().getMaxFramesForWindow();
-
- return new WindowPOperator(partitionColumns, partitionMaterialization,
orderColumns, frameStartIsMonotonic,
- frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames);
+ return new WindowPOperator(winOp.getPartitionVarList(),
partitionMaterialization,
+ winOp.getOrderColumnList(), frameStartIsMonotonic,
frameEndIsMonotonic, nestedTrivialAggregates,
+
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ }
}
-}
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
index 0235dad..db2290c 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WindowOperator.java
@@ -29,9 +29,12 @@ import
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
import
org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -375,4 +378,28 @@ public class WindowOperator extends
AbstractOperatorWithNestedPlans {
public boolean requiresVariableReferenceExpressions() {
return false;
}
+
+ public List<LogicalVariable> getPartitionVarList() {
+ List<LogicalVariable> varList = new
ArrayList<>(partitionExpressions.size());
+ for (Mutable<ILogicalExpression> pe : partitionExpressions) {
+ ILogicalExpression partExpr = pe.getValue();
+ if (partExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ LogicalVariable var = ((VariableReferenceExpression)
partExpr).getVariableReference();
+ varList.add(var);
+ }
+ }
+ return varList;
+ }
+
+ public List<OrderColumn> getOrderColumnList() {
+ List<OrderColumn> orderColumns = new
ArrayList<>(orderExpressions.size());
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p :
orderExpressions) {
+ ILogicalExpression orderExpr = p.second.getValue();
+ if (orderExpr.getExpressionTag() == LogicalExpressionTag.VARIABLE)
{
+ LogicalVariable var = ((VariableReferenceExpression)
orderExpr).getVariableReference();
+ orderColumns.add(new OrderColumn(var, p.first.getKind()));
+ }
+ }
+ return orderColumns;
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 1d5a7e9..49e5a0b 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -18,10 +18,10 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
-import static
org.apache.hyracks.api.exceptions.ErrorCode.ORDER_EXPR_NOT_NORMALIZED;
-
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -32,6 +32,7 @@ import
org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -42,17 +43,41 @@ import
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
@@ -88,10 +113,13 @@ import
org.apache.hyracks.algebricks.core.algebra.operators.physical.SubplanPOpe
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
+import
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import
org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+import org.apache.hyracks.api.exceptions.ErrorCode;
public class SetAlgebricksPhysicalOperatorsRule implements
IAlgebraicRewriteRule {
@@ -105,385 +133,476 @@ public class SetAlgebricksPhysicalOperatorsRule
implements IAlgebraicRewriteRule
public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator)
opRef.getValue();
- // if (context.checkIfInDontApplySet(this, op)) {
- // return false;
- // }
if (op.getPhysicalOperator() != null) {
return false;
}
-
- computeDefaultPhysicalOp(op, true, context);
- // context.addToDontApplySet(this, op);
+ computeDefaultPhysicalOp(op, true,
createPhysicalOperatorFactoryVisitor(context));
return true;
}
- private static void setPhysicalOperators(ILogicalPlan plan, boolean
topLevelOp, IOptimizationContext context)
- throws AlgebricksException {
- for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
root.getValue(), topLevelOp, context);
- }
- }
-
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op,
boolean topLevelOp,
- IOptimizationContext context) throws AlgebricksException {
- PhysicalOptimizationConfig physicalOptimizationConfig =
context.getPhysicalOptimizationConfig();
+ ILogicalOperatorVisitor<IPhysicalOperator, Boolean> physOpFactory)
throws AlgebricksException {
if (op.getPhysicalOperator() == null) {
- switch (op.getOperatorTag()) {
- case AGGREGATE: {
- op.setPhysicalOperator(new AggregatePOperator());
- break;
- }
- case ASSIGN: {
- op.setPhysicalOperator(new AssignPOperator());
- break;
- }
- case DISTINCT: {
- DistinctOperator distinct = (DistinctOperator) op;
- if (topLevelOp) {
- distinct.setPhysicalOperator(new
PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
- } else {
- distinct.setPhysicalOperator(
- new
MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
- }
- break;
- }
- case EMPTYTUPLESOURCE: {
- op.setPhysicalOperator(new EmptyTupleSourcePOperator());
- break;
- }
- case EXCHANGE: {
- if (op.getPhysicalOperator() == null) {
- throw new AlgebricksException("Implementation for
EXCHANGE operator was not set.");
- }
- // implem. choice for exchange should be set by a parent
op.
- break;
+ IPhysicalOperator physOp = op.accept(physOpFactory, topLevelOp);
+ if (physOp == null) {
+ throw
AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET,
op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+ op.setPhysicalOperator(physOp);
+ }
+ if (op.hasNestedPlans()) {
+ AbstractOperatorWithNestedPlans nested =
(AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan p : nested.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> root : p.getRoots()) {
+ computeDefaultPhysicalOp((AbstractLogicalOperator)
root.getValue(), false, physOpFactory);
}
- case GROUP: {
- GroupByOperator gby = (GroupByOperator) op;
-
- if (gby.getNestedPlans().size() == 1) {
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() == 1) {
- if
((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) ==
Boolean.TRUE)
- || (gby.getAnnotations()
-
.get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
- if (!topLevelOp) {
- throw new NotImplementedException(
- "External hash group-by for nested
grouping is not implemented.");
- }
-
- boolean hasIntermediateAgg =
generateMergeAggregationExpressions(gby, context);
- if (hasIntermediateAgg) {
- ExternalGroupByPOperator externalGby =
- new
ExternalGroupByPOperator(gby.getGroupByVarList(),
-
physicalOptimizationConfig.getMaxFramesForGroupBy(),
- (long)
physicalOptimizationConfig.getMaxFramesForGroupBy()
- *
physicalOptimizationConfig.getFrameSize());
- op.setPhysicalOperator(externalGby);
- break;
- }
- }
- }
- }
+ }
+ }
+ for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
+ computeDefaultPhysicalOp((AbstractLogicalOperator)
opRef.getValue(), topLevelOp, physOpFactory);
+ }
+ }
- if (topLevelOp) {
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(gby.getGroupByVarList(),
- gby.isGroupAll(),
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- } else {
- op.setPhysicalOperator(new
MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
-
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
- }
- break;
- }
- case INNERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp,
context);
- break;
- }
- case LEFTOUTERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op,
topLevelOp, context);
- break;
- }
- case LIMIT: {
- op.setPhysicalOperator(new StreamLimitPOperator());
- break;
- }
- case NESTEDTUPLESOURCE: {
- op.setPhysicalOperator(new NestedTupleSourcePOperator());
- break;
- }
- case ORDER: {
- OrderOperator oo = (OrderOperator) op;
- for (Pair<IOrder, Mutable<ILogicalExpression>> p :
oo.getOrderExpressions()) {
- ILogicalExpression e = p.second.getValue();
- if (e.getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
- throw
AlgebricksException.create(ORDER_EXPR_NOT_NORMALIZED, e.getSourceLocation());
- }
- }
- if (topLevelOp) {
- op.setPhysicalOperator(new StableSortPOperator(
-
physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
- } else {
- op.setPhysicalOperator(new
InMemoryStableSortPOperator());
- }
- break;
- }
- case PROJECT: {
- op.setPhysicalOperator(new StreamProjectPOperator());
- break;
- }
- case RUNNINGAGGREGATE: {
- op.setPhysicalOperator(new RunningAggregatePOperator());
- break;
- }
- case REPLICATE: {
- op.setPhysicalOperator(new ReplicatePOperator());
- break;
- }
- case SPLIT:
- op.setPhysicalOperator(new SplitPOperator());
- break;
- case SCRIPT: {
- op.setPhysicalOperator(new
StringStreamingScriptPOperator());
- break;
- }
- case SELECT: {
- op.setPhysicalOperator(new StreamSelectPOperator());
- break;
- }
- case SUBPLAN: {
- op.setPhysicalOperator(new SubplanPOperator());
- break;
- }
- case UNIONALL: {
- if (topLevelOp) {
- op.setPhysicalOperator(new UnionAllPOperator());
- } else {
- op.setPhysicalOperator(new MicroUnionAllPOperator());
- }
- break;
- }
- case INTERSECT: {
- if (topLevelOp) {
- op.setPhysicalOperator(new IntersectPOperator());
- } else {
- throw new IllegalStateException("Micro operator not
implemented for: " + op.getOperatorTag());
- }
- break;
- }
- case UNNEST: {
- op.setPhysicalOperator(new UnnestPOperator());
- break;
- }
- case LEFT_OUTER_UNNEST:
- op.setPhysicalOperator(new LeftOuterUnnestPOperator());
- break;
- case DATASOURCESCAN: {
- DataSourceScanOperator scan = (DataSourceScanOperator) op;
- IDataSource dataSource = scan.getDataSource();
- DataSourceScanPOperator dss = new
DataSourceScanPOperator(dataSource);
- if (dataSource.isScanAccessPathALeaf()) {
- dss.disableJobGenBelowMe();
- }
- op.setPhysicalOperator(dss);
- break;
- }
- case WRITE: {
- op.setPhysicalOperator(new SinkWritePOperator());
- break;
- }
- case DISTRIBUTE_RESULT: {
- op.setPhysicalOperator(new DistributeResultPOperator());
- break;
- }
- case WRITE_RESULT: {
- WriteResultOperator opLoad = (WriteResultOperator) op;
- LogicalVariable payload;
- List<LogicalVariable> keys = new
ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- payload = getKeysAndLoad(opLoad.getPayloadExpression(),
opLoad.getKeyExpressions(), keys);
- if (opLoad.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new
ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
- }
- op.setPhysicalOperator(
- new WriteResultPOperator(opLoad.getDataSource(),
payload, keys, additionalFilteringKeys));
- break;
- }
- case INSERT_DELETE_UPSERT: {
- // Primary index
- InsertDeleteUpsertOperator opLoad =
(InsertDeleteUpsertOperator) op;
- LogicalVariable payload;
- List<LogicalVariable> keys = new
ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- List<LogicalVariable> additionalNonFilterVariables = null;
- if (opLoad.getAdditionalNonFilteringExpressions() != null)
{
- additionalNonFilterVariables = new
ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalNonFilteringExpressions(),
additionalNonFilterVariables);
- }
- payload = getKeysAndLoad(opLoad.getPayloadExpression(),
opLoad.getPrimaryKeyExpressions(), keys);
- if (opLoad.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new
ArrayList<LogicalVariable>();
- getKeys(opLoad.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
- }
- if (opLoad.isBulkload()) {
- op.setPhysicalOperator(new BulkloadPOperator(payload,
keys, additionalFilteringKeys,
- additionalNonFilterVariables,
opLoad.getDataSource()));
- } else {
- op.setPhysicalOperator(new
InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
- opLoad.getDataSource(), opLoad.getOperation(),
additionalNonFilterVariables));
+ protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean>
createPhysicalOperatorFactoryVisitor(
+ IOptimizationContext context) {
+ return new AlgebricksPhysicalOperatorFactoryVisitor(context);
+ }
+
+ protected static class AlgebricksPhysicalOperatorFactoryVisitor
+ implements ILogicalOperatorVisitor<IPhysicalOperator, Boolean> {
+
+ protected final IOptimizationContext context;
+
+ protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+
+ protected
AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
+ this.context = context;
+ this.physicalOptimizationConfig =
context.getPhysicalOptimizationConfig();
+ }
+
+ @Override
+ public IPhysicalOperator visitAggregateOperator(AggregateOperator op,
Boolean topLevelOp) {
+ return new AggregatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitAssignOperator(AssignOperator op,
Boolean topLevelOp) {
+ return new AssignPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitDistinctOperator(DistinctOperator
distinct, Boolean topLevelOp) {
+ if (topLevelOp) {
+ return new
PreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+ } else {
+ return new
MicroPreSortedDistinctByPOperator(distinct.getDistinctByVarList());
+ }
+ }
+
+ @Override
+ public IPhysicalOperator
visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean topLevelOp) {
+ return new EmptyTupleSourcePOperator();
+ }
+
+ @Override
+ public final IPhysicalOperator visitGroupByOperator(GroupByOperator
gby, Boolean topLevelOp)
+ throws AlgebricksException {
+
+ ensureAllVariables(gby.getGroupByList(), Pair::getSecond);
+
+ if (gby.getNestedPlans().size() == 1 &&
gby.getNestedPlans().get(0).getRoots().size() == 1) {
+ if (topLevelOp &&
((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) ==
Boolean.TRUE)
+ ||
(gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) ==
Boolean.TRUE))) {
+ ExternalGroupByPOperator extGby =
createExternalGroupByPOperator(gby);
+ if (extGby != null) {
+ return extGby;
}
- break;
}
- case INDEX_INSERT_DELETE_UPSERT: {
- // Secondary index
- IndexInsertDeleteUpsertOperator opInsDel =
(IndexInsertDeleteUpsertOperator) op;
- List<LogicalVariable> primaryKeys = new
ArrayList<LogicalVariable>();
- List<LogicalVariable> secondaryKeys = new
ArrayList<LogicalVariable>();
- List<LogicalVariable> additionalFilteringKeys = null;
- getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
- getKeys(opInsDel.getSecondaryKeyExpressions(),
secondaryKeys);
- if (opInsDel.getAdditionalFilteringExpressions() != null) {
- additionalFilteringKeys = new
ArrayList<LogicalVariable>();
- getKeys(opInsDel.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
- }
- if (opInsDel.isBulkload()) {
- op.setPhysicalOperator(
- new IndexBulkloadPOperator(primaryKeys,
secondaryKeys, additionalFilteringKeys,
- opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex()));
- } else {
- LogicalVariable upsertIndicatorVar = null;
- List<LogicalVariable> prevSecondaryKeys = null;
- LogicalVariable prevAdditionalFilteringKey = null;
- if (opInsDel.getOperation() == Kind.UPSERT) {
- upsertIndicatorVar =
getKey(opInsDel.getUpsertIndicatorExpr().getValue());
- prevSecondaryKeys = new
ArrayList<LogicalVariable>();
- getKeys(opInsDel.getPrevSecondaryKeyExprs(),
prevSecondaryKeys);
- if
(opInsDel.getPrevAdditionalFilteringExpression() != null) {
- prevAdditionalFilteringKey =
- ((VariableReferenceExpression)
(opInsDel.getPrevAdditionalFilteringExpression())
-
.getValue()).getVariableReference();
- }
- }
- op.setPhysicalOperator(new
IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys,
opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
- upsertIndicatorVar, prevSecondaryKeys,
prevAdditionalFilteringKey,
-
opInsDel.getNumberOfAdditionalNonFilteringFields()));
- }
- break;
+ }
- }
- case TOKENIZE: {
- TokenizeOperator opTokenize = (TokenizeOperator) op;
- List<LogicalVariable> primaryKeys = new
ArrayList<LogicalVariable>();
- List<LogicalVariable> secondaryKeys = new
ArrayList<LogicalVariable>();
- getKeys(opTokenize.getPrimaryKeyExpressions(),
primaryKeys);
- getKeys(opTokenize.getSecondaryKeyExpressions(),
secondaryKeys);
- // Tokenize Operator only operates with a bulk load on a
data set with an index
- if (opTokenize.isBulkload()) {
- op.setPhysicalOperator(
- new TokenizePOperator(primaryKeys,
secondaryKeys, opTokenize.getDataSourceIndex()));
+ if (topLevelOp) {
+ return new
PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
+
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ } else {
+ return new
MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
+
context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+ }
+ }
+
+ protected ExternalGroupByPOperator
createExternalGroupByPOperator(GroupByOperator gby)
+ throws AlgebricksException {
+ boolean hasIntermediateAgg =
generateMergeAggregationExpressions(gby);
+ if (!hasIntermediateAgg) {
+ return null;
+ }
+ return new ExternalGroupByPOperator(gby.getGroupByVarList(),
+ physicalOptimizationConfig.getMaxFramesForGroupBy(),
+ (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+ * physicalOptimizationConfig.getFrameSize());
+ }
+
+ @Override
+ public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+ return op.getPhysicalOperator();
+ }
+
+ @Override
+ public IPhysicalOperator
visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
+ throws AlgebricksException {
+ JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
+ return op.getPhysicalOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitLimitOperator(LimitOperator op, Boolean
topLevelOp) {
+ return new StreamLimitPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator
visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean
topLevelOp) {
+ return new NestedTupleSourcePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean
topLevelOp) throws AlgebricksException {
+ ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
+ if (topLevelOp) {
+ return new
StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(),
oo.getTopK());
+ } else {
+ return new InMemoryStableSortPOperator();
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitProjectOperator(ProjectOperator op,
Boolean topLevelOp) {
+ return new StreamProjectPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator
visitRunningAggregateOperator(RunningAggregateOperator op, Boolean topLevelOp) {
+ return new RunningAggregatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitReplicateOperator(ReplicateOperator op,
Boolean topLevelOp) {
+ return new ReplicatePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSplitOperator(SplitOperator op, Boolean
topLevelOp) {
+ return new SplitPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitScriptOperator(ScriptOperator op,
Boolean topLevelOp) {
+ return new StringStreamingScriptPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSelectOperator(SelectOperator op,
Boolean topLevelOp) {
+ return new StreamSelectPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitSubplanOperator(SubplanOperator op,
Boolean topLevelOp) {
+ return new SubplanPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitUnionOperator(UnionAllOperator op,
Boolean topLevelOp) {
+ if (topLevelOp) {
+ return new UnionAllPOperator();
+ } else {
+ return new MicroUnionAllPOperator();
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitIntersectOperator(IntersectOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ if (topLevelOp) {
+ return new IntersectPOperator();
+ } else {
+ throw
AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED,
op.getSourceLocation(),
+ op.getOperatorTag().toString() + " (micro)");
+ }
+ }
+
+ @Override
+ public IPhysicalOperator visitUnnestOperator(UnnestOperator op,
Boolean topLevelOp) {
+ return new UnnestPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator
visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean topLevelOp) {
+ return new LeftOuterUnnestPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitDataScanOperator(DataSourceScanOperator
scan, Boolean topLevelOp) {
+ IDataSource dataSource = scan.getDataSource();
+ DataSourceScanPOperator dss = new
DataSourceScanPOperator(dataSource);
+ if (dataSource.isScanAccessPathALeaf()) {
+ dss.disableJobGenBelowMe();
+ }
+ return dss;
+ }
+
+ @Override
+ public IPhysicalOperator visitWriteOperator(WriteOperator op, Boolean
topLevelOp) {
+ return new SinkWritePOperator();
+ }
+
+ @Override
+ public IPhysicalOperator
visitDistributeResultOperator(DistributeResultOperator op, Boolean topLevelOp) {
+ return new DistributeResultPOperator();
+ }
+
+ @Override
+ public IPhysicalOperator visitWriteResultOperator(WriteResultOperator
opLoad, Boolean topLevelOp) {
+ List<LogicalVariable> keys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ LogicalVariable payload =
getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
+ if (opLoad.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opLoad.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
+ }
+ return new WriteResultPOperator(opLoad.getDataSource(), payload,
keys, additionalFilteringKeys);
+ }
+
+ @Override
+ public IPhysicalOperator
visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator opLoad,
+ Boolean topLevelOp) {
+ // Primary index
+ List<LogicalVariable> keys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ List<LogicalVariable> additionalNonFilterVariables = null;
+ if (opLoad.getAdditionalNonFilteringExpressions() != null) {
+ additionalNonFilterVariables = new ArrayList<>();
+ getKeys(opLoad.getAdditionalNonFilteringExpressions(),
additionalNonFilterVariables);
+ }
+ LogicalVariable payload =
+ getKeysAndLoad(opLoad.getPayloadExpression(),
opLoad.getPrimaryKeyExpressions(), keys);
+ if (opLoad.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opLoad.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
+ }
+ if (opLoad.isBulkload()) {
+ return new BulkloadPOperator(payload, keys,
additionalFilteringKeys, additionalNonFilterVariables,
+ opLoad.getDataSource());
+ } else {
+ return new InsertDeleteUpsertPOperator(payload, keys,
additionalFilteringKeys, opLoad.getDataSource(),
+ opLoad.getOperation(), additionalNonFilterVariables);
+ }
+ }
+
+ @Override
+ public IPhysicalOperator
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator opInsDel,
+ Boolean topLevelOp) {
+ // Secondary index
+ List<LogicalVariable> primaryKeys = new ArrayList<>();
+ List<LogicalVariable> secondaryKeys = new ArrayList<>();
+ List<LogicalVariable> additionalFilteringKeys = null;
+ getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
+ getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
+ if (opInsDel.getAdditionalFilteringExpressions() != null) {
+ additionalFilteringKeys = new ArrayList<>();
+ getKeys(opInsDel.getAdditionalFilteringExpressions(),
additionalFilteringKeys);
+ }
+ if (opInsDel.isBulkload()) {
+ return new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
additionalFilteringKeys,
+ opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex());
+ } else {
+ LogicalVariable upsertIndicatorVar = null;
+ List<LogicalVariable> prevSecondaryKeys = null;
+ LogicalVariable prevAdditionalFilteringKey = null;
+ if (opInsDel.getOperation() == Kind.UPSERT) {
+ upsertIndicatorVar =
getKey(opInsDel.getUpsertIndicatorExpr().getValue());
+ prevSecondaryKeys = new ArrayList<>();
+ getKeys(opInsDel.getPrevSecondaryKeyExprs(),
prevSecondaryKeys);
+ if (opInsDel.getPrevAdditionalFilteringExpression() !=
null) {
+ prevAdditionalFilteringKey =
+ ((VariableReferenceExpression)
(opInsDel.getPrevAdditionalFilteringExpression())
+ .getValue()).getVariableReference();
}
- break;
}
- case SINK: {
- op.setPhysicalOperator(new SinkPOperator());
- break;
- }
- case FORWARD:
- op.setPhysicalOperator(new SortForwardPOperator());
- break;
+ return new IndexInsertDeleteUpsertPOperator(primaryKeys,
secondaryKeys, additionalFilteringKeys,
+ opInsDel.getFilterExpression(),
opInsDel.getDataSourceIndex(), upsertIndicatorVar,
+ prevSecondaryKeys, prevAdditionalFilteringKey,
+ opInsDel.getNumberOfAdditionalNonFilteringFields());
}
}
- if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans nested =
(AbstractOperatorWithNestedPlans) op;
- for (ILogicalPlan p : nested.getNestedPlans()) {
- setPhysicalOperators(p, false, context);
+
+ @Override
+ public IPhysicalOperator visitTokenizeOperator(TokenizeOperator
opTokenize, Boolean topLevelOp)
+ throws AlgebricksException {
+ List<LogicalVariable> primaryKeys = new ArrayList<>();
+ List<LogicalVariable> secondaryKeys = new ArrayList<>();
+ getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
+ getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
+ // Tokenize Operator only operates with a bulk load on a data set
with an index
+ if (!opTokenize.isBulkload()) {
+ throw
AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED,
opTokenize.getSourceLocation(),
+ opTokenize.getOperatorTag().toString() + " (no
bulkload)");
}
+ return new TokenizePOperator(primaryKeys, secondaryKeys,
opTokenize.getDataSourceIndex());
}
- for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
opRef.getValue(), topLevelOp, context);
+
+ @Override
+ public IPhysicalOperator visitSinkOperator(SinkOperator op, Boolean
topLevelOp) {
+ return new SinkPOperator();
}
- }
- private static void getKeys(List<Mutable<ILogicalExpression>>
keyExpressions, List<LogicalVariable> keys) {
- for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
- keys.add(getKey(kExpr.getValue()));
+ @Override
+ public IPhysicalOperator visitForwardOperator(ForwardOperator op,
Boolean topLevelOp) {
+ return new SortForwardPOperator();
}
- }
- private static LogicalVariable getKey(ILogicalExpression keyExpression) {
- if (keyExpression.getExpressionTag() != LogicalExpressionTag.VARIABLE)
{
- throw new NotImplementedException();
+ @Override
+ public final IPhysicalOperator visitWindowOperator(WindowOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ ensureAllVariables(op.getPartitionExpressions(), v -> v);
+ ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
+ return createWindowPOperator(op);
}
- return ((VariableReferenceExpression)
keyExpression).getVariableReference();
- }
- private static LogicalVariable getKeysAndLoad(Mutable<ILogicalExpression>
payloadExpr,
- List<Mutable<ILogicalExpression>> keyExpressions,
List<LogicalVariable> keys) {
- LogicalVariable payload;
- if (payloadExpr.getValue().getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
- throw new NotImplementedException();
+ protected WindowPOperator createWindowPOperator(WindowOperator op)
throws AlgebricksException {
+ return new WindowPOperator(op.getPartitionVarList(), true,
op.getOrderColumnList(), false, false, false,
+
context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+ }
+
+ // Physical operators for these operators must have been set already
by rules that introduced them
+
+ @Override
+ public IPhysicalOperator visitDelegateOperator(DelegateOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET,
op.getSourceLocation(),
+ op.getOperatorTag());
}
- payload = ((VariableReferenceExpression)
payloadExpr.getValue()).getVariableReference();
- for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
- ILogicalExpression e = kExpr.getValue();
- if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ @Override
+ public IPhysicalOperator visitExchangeOperator(ExchangeOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET,
op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ @Override
+ public IPhysicalOperator visitMaterializeOperator(MaterializeOperator
op, Boolean topLevelOp)
+ throws AlgebricksException {
+ throw AlgebricksException.create(ErrorCode.PHYS_OPERATOR_NOT_SET,
op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ // Physical operators for these operators cannot be instantiated by
Algebricks
+
+ @Override
+ public IPhysicalOperator visitUnnestMapOperator(UnnestMapOperator op,
Boolean topLevelOp)
+ throws AlgebricksException {
+ throw
AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED,
op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ @Override
+ public IPhysicalOperator
visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean
topLevelOp)
+ throws AlgebricksException {
+ throw
AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED,
op.getSourceLocation(),
+ op.getOperatorTag());
+ }
+
+ // Helper methods
+
+ private static void getKeys(List<Mutable<ILogicalExpression>>
keyExpressions, List<LogicalVariable> keys) {
+ for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+ keys.add(getKey(kExpr.getValue()));
+ }
+ }
+
+ private static LogicalVariable getKey(ILogicalExpression
keyExpression) {
+ if (keyExpression.getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
throw new NotImplementedException();
}
- keys.add(((VariableReferenceExpression) e).getVariableReference());
+ return ((VariableReferenceExpression)
keyExpression).getVariableReference();
}
- return payload;
- }
- private static boolean generateMergeAggregationExpressions(GroupByOperator
gby, IOptimizationContext context)
- throws AlgebricksException {
- if (gby.getNestedPlans().size() != 1) {
- //External/Sort group-by currently works only for one nested plan
with one root containing
- //an aggregate and a nested-tuple-source.
- throw new AlgebricksException(
- "External group-by currently works only for one nested
plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- ILogicalPlan p0 = gby.getNestedPlans().get(0);
- if (p0.getRoots().size() != 1) {
- //External/Sort group-by currently works only for one nested plan
with one root containing
- //an aggregate and a nested-tuple-source.
- throw new AlgebricksException(
- "External group-by currently works only for one nested
plan with one root containing"
- + "an aggregate and a nested-tuple-source.");
- }
- IMergeAggregationExpressionFactory mergeAggregationExpressionFactory =
- context.getMergeAggregationExpressionFactory();
- Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
- AbstractLogicalOperator r0Logical = (AbstractLogicalOperator)
r0.getValue();
- if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
- return false;
+ private static LogicalVariable
getKeysAndLoad(Mutable<ILogicalExpression> payloadExpr,
+ List<Mutable<ILogicalExpression>> keyExpressions,
List<LogicalVariable> keys) {
+ LogicalVariable payload;
+ if (payloadExpr.getValue().getExpressionTag() !=
LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException();
+ }
+ payload = ((VariableReferenceExpression)
payloadExpr.getValue()).getVariableReference();
+
+ for (Mutable<ILogicalExpression> kExpr : keyExpressions) {
+ ILogicalExpression e = kExpr.getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException();
+ }
+ keys.add(((VariableReferenceExpression)
e).getVariableReference());
+ }
+ return payload;
}
- // Check whether there are multiple aggregates in the sub plan.
- ILogicalOperator r1Logical = r0Logical;
- while (r1Logical.hasInputs()) {
- r1Logical = r1Logical.getInputs().get(0).getValue();
- if (r1Logical.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ private boolean generateMergeAggregationExpressions(GroupByOperator
gby) throws AlgebricksException {
+ if (gby.getNestedPlans().size() != 1) {
+ //External/Sort group-by currently works only for one nested
plan with one root containing
+ //an aggregate and a nested-tuple-source.
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested
plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ ILogicalPlan p0 = gby.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ //External/Sort group-by currently works only for one nested
plan with one root containing
+ //an aggregate and a nested-tuple-source.
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested
plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ IMergeAggregationExpressionFactory
mergeAggregationExpressionFactory =
+ context.getMergeAggregationExpressionFactory();
+ Mutable<ILogicalOperator> r0 = p0.getRoots().get(0);
+ AbstractLogicalOperator r0Logical = (AbstractLogicalOperator)
r0.getValue();
+ if (r0Logical.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
return false;
}
+
+ // Check whether there are multiple aggregates in the sub plan.
+ ILogicalOperator r1Logical = r0Logical;
+ while (r1Logical.hasInputs()) {
+ r1Logical = r1Logical.getInputs().get(0).getValue();
+ if (r1Logical.getOperatorTag() ==
LogicalOperatorTag.AGGREGATE) {
+ return false;
+ }
+ }
+
+ AggregateOperator aggOp = (AggregateOperator) r0.getValue();
+ List<Mutable<ILogicalExpression>> aggFuncRefs =
aggOp.getExpressions();
+ List<LogicalVariable> originalAggVars = aggOp.getVariables();
+ int n = aggOp.getExpressions().size();
+ List<Mutable<ILogicalExpression>> mergeExpressionRefs = new
ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ ILogicalExpression mergeExpr =
mergeAggregationExpressionFactory
+ .createMergeAggregation(originalAggVars.get(i),
aggFuncRefs.get(i).getValue(), context);
+ if (mergeExpr == null) {
+ return false;
+ }
+ mergeExpressionRefs.add(new MutableObject<>(mergeExpr));
+ }
+ aggOp.setMergeExpressions(mergeExpressionRefs);
+ return true;
}
- AggregateOperator aggOp = (AggregateOperator) r0.getValue();
- List<Mutable<ILogicalExpression>> aggFuncRefs = aggOp.getExpressions();
- List<LogicalVariable> originalAggVars = aggOp.getVariables();
- int n = aggOp.getExpressions().size();
- List<Mutable<ILogicalExpression>> mergeExpressionRefs = new
ArrayList<Mutable<ILogicalExpression>>();
- for (int i = 0; i < n; i++) {
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
- .createMergeAggregation(originalAggVars.get(i),
aggFuncRefs.get(i).getValue(), context);
- if (mergeExpr == null) {
- return false;
+ static <E> void ensureAllVariables(Collection<E> exprList, Function<E,
Mutable<ILogicalExpression>> accessor)
+ throws AlgebricksException {
+ for (E item : exprList) {
+ ILogicalExpression e = accessor.apply(item).getValue();
+ if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw
AlgebricksException.create(ErrorCode.EXPR_NOT_NORMALIZED,
e.getSourceLocation());
+ }
}
- mergeExpressionRefs.add(new
MutableObject<ILogicalExpression>(mergeExpr));
}
- aggOp.setMergeExpressions(mergeExpressionRefs);
- return true;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index bf34664..a31aef2 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -161,7 +161,8 @@ public class ErrorCode {
public static final int CANNOT_COMPOSE_PART_CONSTRAINTS = 10001;
public static final int PHYS_OPERATOR_NOT_SET = 10002;
public static final int DESCRIPTOR_GENERATION_ERROR = 10003;
- public static final int ORDER_EXPR_NOT_NORMALIZED = 10004;
+ public static final int EXPR_NOT_NORMALIZED = 10004;
+ public static final int OPERATOR_NOT_IMPLEMENTED = 10005;
private static class Holder {
private static final Map<Integer, String> errorMessageMap;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b4f7973..8e3b85e 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,4 +143,5 @@
10001 = Cannot compose partition constraint %1$s with %2$s
10002 = Physical operator not set for operator: %1$s
10003 = Could not generate operator descriptor for operator %1$s
-10004 = Order expression has not been normalized
+10004 = Expression has not been normalized
+10005 = Operator is not implemented: %1$s