This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit a4bf1451de142c30caa653db238d104894786d3a Author: Dmitry Lychagin <[email protected]> AuthorDate: Fri Feb 12 12:09:36 2021 -0800 [NO ISSUE][COMP] Support batch assign for external functions - user model changes: no - storage format changes: no - interface changes: no Details: - Add compiler support for batch invocation of external functions Change-Id: I1ed1f5c51628d996327de843f4977d083e9b4bd4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10006 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- .../operators/physical/AssignBatchPOperator.java | 103 ++++++++++ .../jobgen/QueryLogicalExpressionJobGen.java | 13 +- .../asterix/optimizer/base/RuleCollections.java | 6 +- .../ExtractBatchableExternalFunctionCallsRule.java | 220 +++++++++++++++++++++ .../rules/SetAsterixPhysicalOperatorsRule.java | 61 ++++++ .../apache/asterix/api/common/APIFramework.java | 4 +- .../ExternalFunctionDescriptorProvider.java | 19 +- .../library/ExternalScalarFunctionDescriptor.java | 14 +- .../ExternalAssignBatchRuntimeFactory.java | 52 +++++ .../functions/ExternalFunctionCompilerUtil.java | 20 ++ .../om/functions/IExternalFunctionDescriptor.java | 29 +++ .../core/algebra/base/PhysicalOperatorTag.java | 2 +- ...POperator.java => AbstractAssignPOperator.java} | 36 ++-- .../operators/physical/AssignPOperator.java | 84 +------- .../core/algebra/plan/PlanStabilityVerifier.java | 16 +- .../algebra/util/OperatorManipulationUtil.java | 16 +- .../rewriter/rules/ConsolidateAssignsRule.java | 9 + .../rules/SetAlgebricksPhysicalOperatorsRule.java | 2 +- 18 files changed, 573 insertions(+), 133 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java new file mode 100644 index 0000000..0ec5ee7 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/AssignBatchPOperator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.algebra.operators.physical; + +import java.util.List; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.external.library.ExternalFunctionDescriptorProvider; +import org.apache.asterix.external.operators.ExternalAssignBatchRuntimeFactory; +import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil; +import org.apache.asterix.om.functions.IExternalFunctionDescriptor; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; +import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.api.exceptions.SourceLocation; + +public final class AssignBatchPOperator extends AbstractAssignPOperator { + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.ASSIGN_BATCH; + } + + @Override + protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList) + throws AlgebricksException { + IVariableTypeEnvironment inputTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue()); + List<Mutable<ILogicalExpression>> exprList = op.getExpressions(); + int exprCount = exprList.size(); + IExternalFunctionDescriptor[] fnDescs = new IExternalFunctionDescriptor[exprCount]; + int[][] fnArgColumns = new int[exprCount][]; + for (int i = 0; i < exprCount; i++) { + Mutable<ILogicalExpression> exprRef = exprList.get(i); + ILogicalExpression expr = exprRef.getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(), + String.valueOf(expr.getExpressionTag())); + } + AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr; + IFunctionInfo fi = callExpr.getFunctionInfo(); + if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), fi)) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(), + fi.toString()); + } + fnDescs[i] = ExternalFunctionDescriptorProvider.resolveExternalFunction(callExpr, inputTypeEnv, context); + fnArgColumns[i] = getColumns(callExpr.getArguments(), opSchema, op.getSourceLocation()); + } + + return new ExternalAssignBatchRuntimeFactory(outColumns, fnDescs, fnArgColumns, projectionList); + } + + private int[] getColumns(List<Mutable<ILogicalExpression>> exprList, IOperatorSchema opSchema, + SourceLocation sourceLoc) throws CompilationException { + int n = exprList.size(); + int[] columns = new int[n]; + for (int i = 0; i < n; i++) { + ILogicalExpression expr = exprList.get(i).getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, + String.valueOf(expr.getExpressionTag())); + } + VariableReferenceExpression argVarRef = (VariableReferenceExpression) expr; + LogicalVariable argVar = argVarRef.getVariableReference(); + int argColumn = opSchema.findVariable(argVar); + if (argColumn < 0) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, String.valueOf(argVar)); + } + columns[i] = argColumn; + } + return columns; + } +} diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java index 74656e4..2a662d0 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/jobgen/QueryLogicalExpressionJobGen.java @@ -30,7 +30,6 @@ import org.apache.asterix.om.functions.IExternalFunctionInfo; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionManager; import org.apache.asterix.om.functions.IFunctionTypeInferer; -import org.apache.asterix.runtime.functions.FunctionTypeInferers; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -138,15 +137,9 @@ public class QueryLogicalExpressionJobGen implements ILogicalExpressionJobGen { IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { IScalarEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context); - IFunctionDescriptor fd = null; - if (expr.getFunctionInfo() instanceof IExternalFunctionInfo) { - fd = ExternalFunctionDescriptorProvider - .getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo()); - CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties(); - FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, env, props); - } else { - fd = resolveFunction(expr, env, context); - } + IFunctionDescriptor fd = expr.getFunctionInfo() instanceof IExternalFunctionInfo + ? ExternalFunctionDescriptorProvider.resolveExternalFunction(expr, env, context) + : resolveFunction(expr, env, context); return fd.createEvaluatorFactory(args); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java index 4a2b629..d2fe2f5 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java @@ -40,6 +40,7 @@ import org.apache.asterix.optimizer.rules.CheckInsertUpsertReturningRule; import org.apache.asterix.optimizer.rules.ConstantFoldingRule; import org.apache.asterix.optimizer.rules.CountVarToCountOneRule; import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule; +import org.apache.asterix.optimizer.rules.ExtractBatchableExternalFunctionCallsRule; import org.apache.asterix.optimizer.rules.ExtractDistinctByExpressionsRule; import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule; import org.apache.asterix.optimizer.rules.ExtractWindowExpressionsRule; @@ -284,7 +285,7 @@ public final class RuleCollections { public static final List<IAlgebraicRewriteRule> buildConsolidationRuleCollection() { List<IAlgebraicRewriteRule> consolidation = new LinkedList<>(); consolidation.add(new ConsolidateSelectsRule()); - consolidation.add(new ConsolidateAssignsRule()); + consolidation.add(new ConsolidateAssignsRule(false)); consolidation.add(new InlineAssignIntoAggregateRule()); consolidation.add(new RewriteDistinctAggregateRule()); // The following rule should run after RewriteDistinctAggregateRule @@ -353,6 +354,7 @@ public final class RuleCollections { public static final List<IAlgebraicRewriteRule> buildPhysicalRewritesAllLevelsRuleCollection() { List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>(); physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin()); + physicalRewritesAllLevels.add(new ExtractBatchableExternalFunctionCallsRule()); //Turned off the following rule for now not to change OptimizerTest results. physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule()); physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule()); @@ -369,7 +371,7 @@ public final class RuleCollections { physicalRewritesAllLevels.add(new IntroduceMaterializationForInsertWithSelfScanRule()); physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule()); physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule()); - physicalRewritesAllLevels.add(new ConsolidateAssignsRule()); + physicalRewritesAllLevels.add(new ConsolidateAssignsRule(true)); // After adding projects, we may need need to set physical operators again. physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule()); return physicalRewritesAllLevels; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java new file mode 100644 index 0000000..1e5d805 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ExtractBatchableExternalFunctionCallsRule.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.optimizer.rules; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.api.exceptions.SourceLocation; + +public class ExtractBatchableExternalFunctionCallsRule implements IAlgebraicRewriteRule { + + private final ExtractFunctionCallsVisitor extractVisitor = new ExtractFunctionCallsVisitor(); + + private Boolean isRuleEnabled; + + @Override + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { + return false; + } + + @Override + public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { + if (isRuleEnabled == null) { + isRuleEnabled = SetAsterixPhysicalOperatorsRule.isBatchAssignEnabled(context); + } + if (!isRuleEnabled) { + return false; + } + + AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + List<Mutable<ILogicalExpression>> assignTopExprRefs = Collections.emptyList(); + switch (op.getOperatorTag()) { + case ASSIGN: + assignTopExprRefs = ((AssignOperator) op).getExpressions(); + break; + case SELECT: + break; + default: + return false; + } + + if (context.checkIfInDontApplySet(this, op)) { + return false; + } + context.addToDontApplySet(this, op); + + extractVisitor.reset(context, assignTopExprRefs); + if (!op.acceptExpressionTransform(extractVisitor)) { + return false; + } + SourceLocation sourceLoc = op.getSourceLocation(); + + ILogicalOperator inputOp = op.getInputs().get(0).getValue(); + for (int i = 0, ln = extractVisitor.assignVars.size(); i < ln; i++) { + List<LogicalVariable> assignVarList = extractVisitor.assignVars.get(i); + List<Mutable<ILogicalExpression>> assignExprList = extractVisitor.assignExprs.get(i); + AssignOperator assignOp = new AssignOperator(assignVarList, assignExprList); + assignOp.setSourceLocation(sourceLoc); + assignOp.getInputs().add(new MutableObject<>(inputOp)); + context.computeAndSetTypeEnvironmentForOperator(assignOp); + assignOp.recomputeSchema(); + OperatorPropertiesUtil.markMovable(assignOp, false); + + context.addToDontApplySet(this, assignOp); + for (LogicalVariable assignVar : assignVarList) { + context.addNotToBeInlinedVar(assignVar); + } + + inputOp = assignOp; + } + + op.getInputs().clear(); + op.getInputs().add(new MutableObject<>(inputOp)); + context.computeAndSetTypeEnvironmentForOperator(op); + op.recomputeSchema(); + return true; + } + + private static final class ExtractFunctionCallsVisitor implements ILogicalExpressionReferenceTransform { + + private final List<List<LogicalVariable>> assignVars = new ArrayList<>(); + + private final List<List<Mutable<ILogicalExpression>>> assignExprs = new ArrayList<>(); + + private final List<LogicalVariable> usedVarList = new ArrayList<>(); + + private IOptimizationContext context; + + private List<Mutable<ILogicalExpression>> dontExtractFromExprRefs; + + public void reset(IOptimizationContext context, List<Mutable<ILogicalExpression>> dontExtractFromExprRefs) { + this.context = context; + this.dontExtractFromExprRefs = dontExtractFromExprRefs; + assignVars.clear(); + assignExprs.clear(); + } + + @Override + public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException { + ILogicalExpression expr = exprRef.getValue(); + switch (expr.getExpressionTag()) { + case FUNCTION_CALL: + AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr; + boolean applied = false; + for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) { + applied |= transform(argRef); + } + AbstractFunctionCallExpression.FunctionKind fnKind = callExpr.getKind(); + IFunctionInfo fnInfo = callExpr.getFunctionInfo(); + if (ExternalFunctionCompilerUtil.supportsBatchInvocation(fnKind, fnInfo) + && callExpr.isFunctional()) { + // need to extract non-variable arguments into separate ASSIGNS + // because batched assign can only operate on columns + for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) { + ILogicalExpression argExpr = argRef.getValue(); + if (argExpr.getExpressionTag() != LogicalExpressionTag.VARIABLE) { + LogicalVariable newArgVar = context.newVar(); + VariableReferenceExpression newArgVarRef = new VariableReferenceExpression(newArgVar); + newArgVarRef.setSourceLocation(expr.getSourceLocation()); + saveAssignVar(newArgVar, argExpr); + argRef.setValue(newArgVarRef); + applied = true; + } + } + // need extract function call itself into a separate ASSIGN + // (unless it's already a top level expression of the ASSIGN operator we're visiting) + boolean dontExtractExprRef = indexOf(dontExtractFromExprRefs, exprRef) >= 0; + if (!dontExtractExprRef) { + LogicalVariable newVar = context.newVar(); + VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar); + newVarRef.setSourceLocation(expr.getSourceLocation()); + saveAssignVar(newVar, expr); + exprRef.setValue(newVarRef); + applied = true; + } + } + return applied; + case VARIABLE: + case CONSTANT: + return false; + default: + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expr.getSourceLocation(), + expr.getExpressionTag().toString()); + } + } + + private void saveAssignVar(LogicalVariable var, ILogicalExpression expr) { + List<LogicalVariable> assignVarList = null; + List<Mutable<ILogicalExpression>> assignExprList = null; + + if (!assignVars.isEmpty()) { + usedVarList.clear(); + expr.getUsedVariables(usedVarList); + for (int i = 0, ln = assignVars.size(); i < ln; i++) { + List<LogicalVariable> candidateVarList = assignVars.get(i); + if (OperatorPropertiesUtil.disjoint(candidateVarList, usedVarList)) { + assignVarList = candidateVarList; + assignExprList = assignExprs.get(i); + break; + } + } + } + + if (assignVarList == null) { + // first time, or couldn't find a disjoint var list + assignVarList = new ArrayList<>(); + assignExprList = new ArrayList<>(); + assignVars.add(assignVarList); + assignExprs.add(assignExprList); + } + + assignVarList.add(var); + assignExprList.add(new MutableObject<>(expr)); + } + + public static int indexOf(List<Mutable<ILogicalExpression>> exprList, Mutable<ILogicalExpression> exprRef) { + return OperatorManipulationUtil.indexOf(exprList, + (listItemExprRef, paramExprRef) -> listItemExprRef == paramExprRef, exprRef); + } + } +} diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index e662737..ea51fc3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -21,6 +21,7 @@ package org.apache.asterix.optimizer.rules; import java.util.ArrayList; import java.util.List; +import org.apache.asterix.algebra.operators.physical.AssignBatchPOperator; import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator; import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator; import org.apache.asterix.algebra.operators.physical.RTreeSearchPOperator; @@ -30,6 +31,7 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.metadata.declared.DataSourceId; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.functions.ExternalFunctionCompilerUtil; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.optimizer.base.AnalysisUtil; import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams; @@ -49,10 +51,12 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; @@ -69,16 +73,47 @@ import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperato public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysicalOperatorsRule { + // Disable ASSIGN_BATCH physical operator if this option is set to 'false' + public static final String REWRITE_ATTEMPT_BATCH_ASSIGN = "rewrite_attempt_batch_assign"; + static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = false; + @Override protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor( IOptimizationContext context) { return new AsterixPhysicalOperatorFactoryVisitor(context); } + static boolean isBatchAssignEnabled(IOptimizationContext context) { + MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); + return metadataProvider.getBooleanProperty(REWRITE_ATTEMPT_BATCH_ASSIGN, REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT); + } + private static class AsterixPhysicalOperatorFactoryVisitor extends AlgebricksPhysicalOperatorFactoryVisitor { + private final boolean isBatchAssignEnabled; + private AsterixPhysicalOperatorFactoryVisitor(IOptimizationContext context) { super(context); + isBatchAssignEnabled = isBatchAssignEnabled(context); + } + + @Override + public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException { + List<Mutable<ILogicalExpression>> exprList = op.getExpressions(); + boolean batchMode = isBatchAssignEnabled && exprList.size() > 0 && allBatchableFunctionCalls(exprList); + if (batchMode) { + // disable inlining of variable arguments + for (Mutable<ILogicalExpression> exprRef : exprList) { + AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) exprRef.getValue(); + for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) { + LogicalVariable var = ((VariableReferenceExpression) argRef.getValue()).getVariableReference(); + context.addNotToBeInlinedVar(var); + } + } + return new AssignBatchPOperator(); + } else { + return super.visitAssignOperator(op, topLevelOp); + } } @Override @@ -280,5 +315,31 @@ public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysical return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList()); } } + + private boolean allBatchableFunctionCalls(List<Mutable<ILogicalExpression>> exprList) + throws CompilationException { + for (Mutable<ILogicalExpression> exprRef : exprList) { + if (!isBatchableFunctionCall(exprRef.getValue())) { + return false; + } + } + return true; + } + + private static boolean isBatchableFunctionCall(ILogicalExpression expr) throws CompilationException { + if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + return false; + } + AbstractFunctionCallExpression callExpr = (AbstractFunctionCallExpression) expr; + if (!ExternalFunctionCompilerUtil.supportsBatchInvocation(callExpr.getKind(), callExpr.getFunctionInfo())) { + return false; + } + for (Mutable<ILogicalExpression> argRef : callExpr.getArguments()) { + if (argRef.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) { + return false; + } + } + return true; + } } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 9088db6..d60047e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -72,6 +72,7 @@ import org.apache.asterix.om.base.IAObject; import org.apache.asterix.optimizer.base.AsterixOptimizationContext; import org.apache.asterix.optimizer.base.FuzzyUtils; import org.apache.asterix.optimizer.rules.DisjunctivePredicateToJoinRule; +import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.ExecutionPlans; @@ -141,7 +142,8 @@ public class APIFramework { StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", - DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION); + DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION, + SetAsterixPhysicalOperatorsRule.REWRITE_ATTEMPT_BATCH_ASSIGN); private final IRewriterFactory rewriterFactory; private final IAstPrintVisitorFactory astPrintVisitorFactory; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java index 8ac507f..dc51c5f 100755 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionDescriptorProvider.java @@ -18,15 +18,30 @@ */ package org.apache.asterix.external.library; +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.om.functions.IExternalFunctionDescriptor; import org.apache.asterix.om.functions.IExternalFunctionInfo; -import org.apache.asterix.om.functions.IFunctionDescriptor; +import org.apache.asterix.runtime.functions.FunctionTypeInferers; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; public class ExternalFunctionDescriptorProvider { - public static IFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo) + public static IExternalFunctionDescriptor resolveExternalFunction(AbstractFunctionCallExpression expr, + IVariableTypeEnvironment inputTypeEnv, JobGenContext context) throws AlgebricksException { + IExternalFunctionDescriptor fd = getExternalFunctionDescriptor((IExternalFunctionInfo) expr.getFunctionInfo()); + CompilerProperties props = ((IApplicationContext) context.getAppContext()).getCompilerProperties(); + FunctionTypeInferers.SET_ARGUMENTS_TYPE.infer(expr, fd, inputTypeEnv, props); + fd.setSourceLocation(expr.getSourceLocation()); + return fd; + } + + private static IExternalFunctionDescriptor getExternalFunctionDescriptor(IExternalFunctionInfo finfo) throws AlgebricksException { switch (finfo.getKind()) { case SCALAR: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java index 61ab3ea..63f0a13 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java @@ -19,15 +19,15 @@ package org.apache.asterix.external.library; +import org.apache.asterix.om.functions.IExternalFunctionDescriptor; import org.apache.asterix.om.functions.IExternalFunctionInfo; -import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; public class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDynamicDescriptor - implements IFunctionDescriptor { + implements IExternalFunctionDescriptor { private static final long serialVersionUID = 2L; private final IExternalFunctionInfo finfo; @@ -54,4 +54,14 @@ public class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDyna public FunctionIdentifier getIdentifier() { return finfo.getFunctionIdentifier(); } + + @Override + public IExternalFunctionInfo getFunctionInfo() { + return finfo; + } + + @Override + public IAType[] getArgumentTypes() { + return argTypes; + } } \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java new file mode 100644 index 0000000..44a17a9 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.external.operators; + +import org.apache.asterix.om.functions.IExternalFunctionDescriptor; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private int[] outColumns; + private final IExternalFunctionDescriptor[] fnDescs; + private final int[][] fnArgColumns; + + public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs, + int[][] fnArgColumns, int[] projectionList) { + super(projectionList); + this.outColumns = outColumns; + this.fnDescs = fnDescs; + this.fnArgColumns = fnArgColumns; + } + + @Override + public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) + throws HyracksDataException { + throw new HyracksDataException(ErrorCode.OPERATOR_NOT_IMPLEMENTED, sourceLoc, + PhysicalOperatorTag.ASSIGN_BATCH.toString()); + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java index 029b13a..f900c92 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java @@ -29,6 +29,7 @@ import org.apache.asterix.common.functions.ExternalFunctionLanguage; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.BuiltinTypeMap; import org.apache.asterix.metadata.entities.Function; +import org.apache.asterix.om.functions.IExternalFunctionInfo; import org.apache.asterix.om.typecomputer.base.IResultTypeComputer; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; @@ -167,4 +168,23 @@ public class ExternalFunctionCompilerUtil { String.valueOf(actualSize), language.name()); } } + + public static boolean supportsBatchInvocation(FunctionKind fnKind, IFunctionInfo fnInfo) + throws CompilationException { + if (fnKind != FunctionKind.SCALAR) { + return false; + } + if (!(fnInfo instanceof IExternalFunctionInfo)) { + return false; + } + ExternalFunctionLanguage language = ((IExternalFunctionInfo) fnInfo).getLanguage(); + switch (language) { + case JAVA: + return false; + case PYTHON: + return false; + default: + throw new CompilationException(ErrorCode.METADATA_ERROR, language.name()); + } + } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java new file mode 100644 index 0000000..8d362ce --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionDescriptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.om.functions; + +import org.apache.asterix.om.types.IAType; + +public interface IExternalFunctionDescriptor extends IFunctionDescriptor { + + IExternalFunctionInfo getFunctionInfo(); + + IAType[] getArgumentTypes(); +} diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java index b7f6d62..d590f71 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java @@ -21,6 +21,7 @@ package org.apache.hyracks.algebricks.core.algebra.base; public enum PhysicalOperatorTag { AGGREGATE, ASSIGN, + ASSIGN_BATCH, BROADCAST_EXCHANGE, BTREE_SEARCH, BULKLOAD, @@ -33,7 +34,6 @@ public enum PhysicalOperatorTag { FORWARD, HASH_PARTITION_EXCHANGE, HASH_PARTITION_MERGE_EXCHANGE, - HDFS_READER, HYBRID_HASH_JOIN, IN_MEMORY_HASH_JOIN, MICRO_STABLE_SORT, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java similarity index 75% copy from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java copy to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java index 23fa1ee..4466a25 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/AbstractAssignPOperator.java @@ -16,39 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.core.algebra.operators.physical; + +package org.apache.hyracks.algebricks.core.algebra.operators; import java.util.List; -import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; -import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; -import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -public class AssignPOperator extends AbstractPhysicalOperator { +public abstract class AbstractAssignPOperator extends AbstractPhysicalOperator { - private boolean flushFramesRapidly; - private String[] locations; + protected boolean flushFramesRapidly; - @Override - public PhysicalOperatorTag getOperatorTag() { - return PhysicalOperatorTag.ASSIGN; - } + protected String[] locations; @Override public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { @@ -72,23 +65,16 @@ public class AssignPOperator extends AbstractPhysicalOperator { throws AlgebricksException { AssignOperator assign = (AssignOperator) op; List<LogicalVariable> variables = assign.getVariables(); - List<Mutable<ILogicalExpression>> expressions = assign.getExpressions(); int[] outColumns = new int[variables.size()]; for (int i = 0; i < outColumns.length; i++) { outColumns[i] = opSchema.findVariable(variables.get(i)); } - IScalarEvaluatorFactory[] evalFactories = new IScalarEvaluatorFactory[expressions.size()]; - IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); - for (int i = 0; i < evalFactories.length; i++) { - evalFactories[i] = expressionRuntimeProvider.createEvaluatorFactory(expressions.get(i).getValue(), - context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context); - } // TODO push projections into the operator int[] projectionList = JobGenHelper.projectAllVariables(opSchema); - AssignRuntimeFactory runtime = - new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly); + IPushRuntimeFactory runtime = + createRuntimeFactory(context, assign, opSchema, inputSchemas, outColumns, projectionList); runtime.setSourceLocation(assign.getSourceLocation()); // contribute one Asterix framewriter @@ -105,6 +91,10 @@ public class AssignPOperator extends AbstractPhysicalOperator { builder.contributeGraphEdge(src, 0, assign, 0); } + protected abstract IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList) + throws AlgebricksException; + @Override public boolean isMicroOperator() { return true; diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java index 23fa1ee..5f927fd 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java @@ -21,105 +21,35 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical; import java.util.List; import org.apache.commons.lang3.mutable.Mutable; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; -import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.AbstractAssignPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -public class AssignPOperator extends AbstractPhysicalOperator { - - private boolean flushFramesRapidly; - private String[] locations; +public class AssignPOperator extends AbstractAssignPOperator { @Override public PhysicalOperatorTag getOperatorTag() { return PhysicalOperatorTag.ASSIGN; } - @Override - public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) { - AssignOperator assignOp = (AssignOperator) op; - ILogicalOperator op2 = op.getInputs().get(0).getValue(); - deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); - if (assignOp.getExplicitOrderingProperty() != null) { - deliveredProperties.getLocalProperties().add(assignOp.getExplicitOrderingProperty()); - } - } - - @Override - public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, - IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { - return emptyUnaryRequirements(); - } - - @Override - public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, - IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + protected IPushRuntimeFactory createRuntimeFactory(JobGenContext context, AssignOperator op, + IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, int[] outColumns, int[] projectionList) throws AlgebricksException { - AssignOperator assign = (AssignOperator) op; - List<LogicalVariable> variables = assign.getVariables(); - List<Mutable<ILogicalExpression>> expressions = assign.getExpressions(); - int[] outColumns = new int[variables.size()]; - for (int i = 0; i < outColumns.length; i++) { - outColumns[i] = opSchema.findVariable(variables.get(i)); - } + List<Mutable<ILogicalExpression>> expressions = op.getExpressions(); IScalarEvaluatorFactory[] evalFactories = new IScalarEvaluatorFactory[expressions.size()]; IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); for (int i = 0; i < evalFactories.length; i++) { evalFactories[i] = expressionRuntimeProvider.createEvaluatorFactory(expressions.get(i).getValue(), context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context); } - - // TODO push projections into the operator - int[] projectionList = JobGenHelper.projectAllVariables(opSchema); - - AssignRuntimeFactory runtime = - new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly); - runtime.setSourceLocation(assign.getSourceLocation()); - - // contribute one Asterix framewriter - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - if (locations != null && locations.length > 0) { - AlgebricksAbsolutePartitionConstraint locationConstraint = - new AlgebricksAbsolutePartitionConstraint(locations); - builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint); - } else { - builder.contributeMicroOperator(assign, runtime, recDesc); - } - // and contribute one edge from its child - ILogicalOperator src = assign.getInputs().get(0).getValue(); - builder.contributeGraphEdge(src, 0, assign, 0); - } - - @Override - public boolean isMicroOperator() { - return true; - } - - public void setRapidFrameFlush(boolean flushFramesRapidly) { - this.flushFramesRapidly = flushFramesRapidly; - } - - public void setLocationConstraint(String[] locations) { - this.locations = locations; - } - - @Override - public boolean expensiveThanMaterialization() { - return false; + return new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly); } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java index 3f7a0c1..5e10819 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.function.BiFunction; -import java.util.function.BiPredicate; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -34,6 +33,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; @@ -250,21 +250,11 @@ public final class PlanStabilityVerifier { } private static <T> int findItem(List<T> list, T item) { - return indexOf(list, (listItem, paramItem) -> listItem == paramItem, item); + return OperatorManipulationUtil.indexOf(list, (listItem, paramItem) -> listItem == paramItem, item); } private static <T> int findNonNull(List<T> list) { - return indexOf(list, (listItem, none) -> listItem != null, null); - } - - private static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) { - for (int i = 0, n = list.size(); i < n; i++) { - T listItem = list.get(i); - if (predicate.test(listItem, predicateParam)) { - return i; - } - } - return -1; + return OperatorManipulationUtil.indexOf(list, (listItem, none) -> listItem != null, null); } static String printOperator(Mutable<ILogicalOperator> opRef, IPlanPrettyPrinter printer) { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index dd109ff..aa2ecdc 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -27,6 +27,7 @@ import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiPredicate; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -385,8 +386,21 @@ public class OperatorManipulationUtil { * @return operator position in the given list or {@code -1} if not found */ public static int indexOf(List<Mutable<ILogicalOperator>> list, ILogicalOperator op) { + return indexOf(list, (listItemOpRef, paramOp) -> listItemOpRef.getValue() == paramOp, op); + } + + /** + * Find an item a given list + * + * @param list list to search in + * @param predicate predicate to test + * @param predicateParam parameter to pass to the predicate + * @return item position in the given list or {@code -1} if not found + */ + public static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) { for (int i = 0, ln = list.size(); i < ln; i++) { - if (list.get(i).getValue() == op) { + T listItem = list.get(i); + if (predicate.test(listItem, predicateParam)) { return i; } } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java index bbb6cbd..bf9ab6a 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateAssignsRule.java @@ -35,6 +35,12 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; public class ConsolidateAssignsRule implements IAlgebraicRewriteRule { + private final boolean recomputeSchema; + + public ConsolidateAssignsRule(boolean recomputeSchema) { + this.recomputeSchema = recomputeSchema; + } + @Override public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) { return false; @@ -76,6 +82,9 @@ public class ConsolidateAssignsRule implements IAlgebraicRewriteRule { asgnInpList.clear(); asgnInpList.add(botOpRef); context.computeAndSetTypeEnvironmentForOperator(assign1); + if (recomputeSchema) { + assign1.recomputeSchema(); + } return true; } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 7243827..b95971f 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -190,7 +190,7 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule } @Override - public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) { + public IPhysicalOperator visitAssignOperator(AssignOperator op, Boolean topLevelOp) throws AlgebricksException { return new AssignPOperator(); }
