http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java index 4a79387..b1f646a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java @@ -36,7 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -187,7 +187,7 @@ public class SweepIllegalNonfunctionalFunctions extends AbstractExtractExprRule } @Override - public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException { + public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { return null; }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java index 17dec7c..c033214 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.asterix.algebra.extension.IAlgebraExtensionManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails; +import org.apache.asterix.external.feed.watch.FeedActivityDetails; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java index 16ac80d..ec29b53 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.asterix.om.util.ConstantExpressionUtil; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer; import org.apache.asterix.metadata.api.IMetadataEntity; @@ -39,6 +38,7 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.om.util.ConstantExpressionUtil; import org.apache.asterix.optimizer.rules.am.OptimizableOperatorSubTree.DataSourceType; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -72,15 +72,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew private AqlMetadataProvider metadataProvider; // Function Identifier sets that retain the original field variable through each function's arguments - private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName = - ImmutableSet.of(AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS, - AsterixBuiltinFunctions.SUBSTRING, AsterixBuiltinFunctions.SUBSTRING_BEFORE, - AsterixBuiltinFunctions.SUBSTRING_AFTER, AsterixBuiltinFunctions.CREATE_POLYGON, - AsterixBuiltinFunctions.CREATE_MBR, AsterixBuiltinFunctions.CREATE_RECTANGLE, - AsterixBuiltinFunctions.CREATE_CIRCLE, AsterixBuiltinFunctions.CREATE_LINE, - AsterixBuiltinFunctions.CREATE_POINT, AsterixBuiltinFunctions.NUMERIC_ADD, - AsterixBuiltinFunctions.NUMERIC_SUBTRACT, AsterixBuiltinFunctions.NUMERIC_MULTIPLY, - AsterixBuiltinFunctions.NUMERIC_DIVIDE, AsterixBuiltinFunctions.NUMERIC_MOD); + private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName = ImmutableSet.of( + AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS, AsterixBuiltinFunctions.SUBSTRING, + AsterixBuiltinFunctions.SUBSTRING_BEFORE, AsterixBuiltinFunctions.SUBSTRING_AFTER, + AsterixBuiltinFunctions.CREATE_POLYGON, AsterixBuiltinFunctions.CREATE_MBR, + AsterixBuiltinFunctions.CREATE_RECTANGLE, AsterixBuiltinFunctions.CREATE_CIRCLE, + AsterixBuiltinFunctions.CREATE_LINE, AsterixBuiltinFunctions.CREATE_POINT, + AsterixBuiltinFunctions.NUMERIC_ADD, AsterixBuiltinFunctions.NUMERIC_SUBTRACT, + AsterixBuiltinFunctions.NUMERIC_MULTIPLY, AsterixBuiltinFunctions.NUMERIC_DIVIDE, + AsterixBuiltinFunctions.NUMERIC_MOD); public abstract Map<FunctionIdentifier, List<IAccessMethod>> getAccessMethods(); @@ -108,7 +108,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context) - throws AlgebricksException { + throws AlgebricksException { Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator(); // Check applicability of indexes by access method type. while (amIt.hasNext()) { @@ -145,15 +145,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew return list.isEmpty() ? null : list.get(0); } - protected List<Pair<IAccessMethod, Index>> - chooseAllIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) { + protected List<Pair<IAccessMethod, Index>> chooseAllIndex( + Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) { List<Pair<IAccessMethod, Index>> result = new ArrayList<Pair<IAccessMethod, Index>>(); Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator(); while (amIt.hasNext()) { Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next(); AccessMethodAnalysisContext analysisCtx = amEntry.getValue(); - Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt = - analysisCtx.indexExprsAndVars.entrySet().iterator(); + Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt = analysisCtx.indexExprsAndVars.entrySet() + .iterator(); while (indexIt.hasNext()) { Map.Entry<Index, List<Pair<Integer, Integer>>> indexEntry = indexIt.next(); // To avoid a case where the chosen access method and a chosen @@ -167,15 +167,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew // LENGTH_PARTITIONED_NGRAM_INVIX] IAccessMethod chosenAccessMethod = amEntry.getKey(); Index chosenIndex = indexEntry.getKey(); - boolean isKeywordOrNgramIndexChosen = - chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX - || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX - || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX; - - if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && chosenIndex.getIndexType() == IndexType.BTREE) - || (chosenAccessMethod == RTreeAccessMethod.INSTANCE - && chosenIndex.getIndexType() == IndexType.RTREE) + IndexType indexType = chosenIndex.getIndexType(); + boolean isKeywordOrNgramIndexChosen = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX + || indexType == IndexType.SINGLE_PARTITION_WORD_INVIX + || indexType == IndexType.SINGLE_PARTITION_NGRAM_INVIX; + + if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && indexType == IndexType.BTREE) + || (chosenAccessMethod == RTreeAccessMethod.INSTANCE && indexType == IndexType.RTREE) || (chosenAccessMethod == InvertedIndexAccessMethod.INSTANCE && isKeywordOrNgramIndexChosen)) { result.add(new Pair<IAccessMethod, Index>(chosenAccessMethod, chosenIndex)); } @@ -196,8 +195,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew */ public void pruneIndexCandidates(IAccessMethod accessMethod, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, IVariableTypeEnvironment typeEnvironment) throws AlgebricksException { - Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt = - analysisCtx.indexExprsAndVars.entrySet().iterator(); + Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt = analysisCtx.indexExprsAndVars + .entrySet().iterator(); // Used to keep track of matched expressions (added for prefix search) int numMatchedKeys = 0; ArrayList<Integer> matchedExpressions = new ArrayList<Integer>(); @@ -226,24 +225,22 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew } boolean typeMatch = true; //Prune indexes based on field types - List<IAType> indexedTypes = new ArrayList<IAType>(); + List<IAType> matchedTypes = new ArrayList<>(); //retrieve types of expressions joined/selected with an indexed field for (int j = 0; j < optFuncExpr.getNumLogicalVars(); j++) { if (j != exprAndVarIdx.second) { - indexedTypes.add(optFuncExpr.getFieldType(j)); + matchedTypes.add(optFuncExpr.getFieldType(j)); } } - //add constants in case of select - if (indexedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1 - && optFuncExpr.getNumConstantAtRuntimeExpr() > 0) { - indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType( + if (matchedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1) { + matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType( optFuncExpr.getConstantAtRuntimeExpr(0), context.getMetadataProvider(), typeEnvironment)); } //infer type of logicalExpr based on index keyType - indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType( + matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType( optFuncExpr.getLogicalExpr(exprAndVarIdx.second), null, new IVariableTypeEnvironment() { @Override @@ -257,7 +254,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew @Override public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables, List<List<LogicalVariable>> correlatedNullableVariableLists) - throws AlgebricksException { + throws AlgebricksException { if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) { return keyType; } @@ -285,16 +282,16 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew boolean jaccardSimilarity = optFuncExpr.getFuncExpr().getFunctionIdentifier().getName() .startsWith("similarity-jaccard-check"); - for (int j = 0; j < indexedTypes.size(); j++) { - for (int k = j + 1; k < indexedTypes.size(); k++) { - typeMatch &= isMatched(indexedTypes.get(j), indexedTypes.get(k), jaccardSimilarity); + for (int j = 0; j < matchedTypes.size(); j++) { + for (int k = j + 1; k < matchedTypes.size(); k++) { + typeMatch &= isMatched(matchedTypes.get(j), matchedTypes.get(k), jaccardSimilarity); } } // Check if any field name in the optFuncExpr matches. if (optFuncExpr.findFieldName(keyField) != -1) { - foundKeyField = - typeMatch && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan(); + foundKeyField = typeMatch + && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan(); if (foundKeyField) { matchedExpressions.add(exprAndVarIdx.first); numMatchedKeys++; @@ -369,8 +366,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew continue; } AbstractFunctionCallExpression argFuncExpr = (AbstractFunctionCallExpression) argExpr; - boolean matchFound = - analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context, typeEnvironment); + boolean matchFound = analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context, + typeEnvironment); found = found || matchFound; } return found; @@ -435,14 +432,13 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType, IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx, OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx) - throws AlgebricksException { + throws AlgebricksException { List<Index> indexCandidates = new ArrayList<Index>(); // Add an index to the candidates if one of the indexed fields is // fieldName for (Index index : datasetIndexes) { // Need to also verify the index is pending no op - if (index.getKeyFieldNames().contains(fieldName) - && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) { + if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) { indexCandidates.add(index); if (optFuncExpr.getFieldType(varIdx) == BuiltinType.AMISSING || optFuncExpr.getFieldType(varIdx) == BuiltinType.ANY) { @@ -540,8 +536,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew return; } } - IAType fieldType = - (IAType) context.getOutputTypeEnvironment(unnestOp).getType(optFuncExpr.getLogicalExpr(funcVarIndex)); + IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp) + .getType(optFuncExpr.getLogicalExpr(funcVarIndex)); // Set the fieldName in the corresponding matched function // expression. optFuncExpr.setFieldName(funcVarIndex, fieldName); @@ -571,16 +567,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew // Remember matching subtree. optFuncExpr.setOptimizableSubTree(optVarIndex, subTree); - List<String> fieldName = - getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex, - subTree.getRecordType(), optVarIndex, - optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), - datasetRecordVar, subTree.getMetaRecordType(), datasetMetaVar); + List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex, + subTree.getRecordType(), optVarIndex, + optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar, + subTree.getMetaRecordType(), datasetMetaVar); if (fieldName == null) { continue; } - IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp) - .getType(optFuncExpr.getLogicalExpr(optVarIndex)); + IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp).getVarType(var); // Set the fieldName in the corresponding matched // function expression. optFuncExpr.setFieldName(optVarIndex, fieldName); @@ -597,7 +591,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex, List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree, AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource) - throws AlgebricksException { + throws AlgebricksException { for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) { LogicalVariable var = dsVarList.get(varIndex); int funcVarIndex = optFuncExpr.findLogicalVar(var); @@ -615,16 +609,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew // Check whether this variable is PK, not a record variable. if (varIndex <= subTreePKs.size() - 1) { fieldName = subTreePKs.get(varIndex); - fieldType = - (IAType) context.getOutputTypeEnvironment( - subTree.getDataSourceRef().getValue()).getVarType(var); + fieldType = (IAType) context.getOutputTypeEnvironment(subTree.getDataSourceRef().getValue()) + .getVarType(var); } } else { // Need to check additional dataset one by one for (int i = 0; i < subTree.getIxJoinOuterAdditionalDatasets().size(); i++) { if (subTree.getIxJoinOuterAdditionalDatasets().get(i) != null) { - subTreePKs = DatasetUtils.getPartitioningKeys( - subTree.getIxJoinOuterAdditionalDatasets().get(i)); + subTreePKs = DatasetUtils + .getPartitioningKeys(subTree.getIxJoinOuterAdditionalDatasets().get(i)); // Check whether this variable is PK, not a record variable. if (subTreePKs.contains(var) && varIndex <= subTreePKs.size() - 1) { @@ -667,11 +660,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew * * @throws AlgebricksException */ - protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, - OptimizableOperatorSubTree subTree, int opIndex, int assignVarIndex, ARecordType recordType, - int funcVarIndex, ILogicalExpression parentFuncExpr, LogicalVariable recordVar, - ARecordType metaType, LogicalVariable metaVar) - throws AlgebricksException { + protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree, + int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex, + ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar) + throws AlgebricksException { // Get expression corresponding to opVar at varIndex. AbstractLogicalExpression expr = null; AbstractFunctionCallExpression childFuncExpr = null; @@ -679,6 +671,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) { AssignOperator assignOp = (AssignOperator) op; expr = (AbstractLogicalExpression) assignOp.getExpressions().get(assignVarIndex).getValue(); + if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { + //Otherwise the cast for childFuncExpr would fail + return null; + } childFuncExpr = (AbstractFunctionCallExpression) expr; } else { UnnestOperator unnestOp = (UnnestOperator) op; @@ -723,8 +719,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew return null; } ConstantExpression constExpr = (ConstantExpression) nameArg; - AOrderedList orderedNestedFieldName = - (AOrderedList) ((AsterixConstantValue) constExpr.getValue()).getObject(); + AOrderedList orderedNestedFieldName = (AOrderedList) ((AsterixConstantValue) constExpr.getValue()) + .getObject(); nestedAccessFieldName = new ArrayList<String>(); for (int i = 0; i < orderedNestedFieldName.size(); i++) { nestedAccessFieldName.add(((AString) orderedNestedFieldName.getItem(i)).getStringValue()); @@ -733,8 +729,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew isByName = true; } if (isFieldAccess) { - LogicalVariable sourceVar = - ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference(); + LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()) + .getVariableReference(); optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr); int[] assignAndExpressionIndexes = null; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java index 874cc7c..23e45c4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java @@ -56,7 +56,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -490,7 +490,7 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper } @Override - public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java index eeb2c2a..d3a0c0f 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java @@ -41,7 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -255,7 +255,7 @@ class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisit } @Override - public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException { + public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { return visitSingleInputOperator(op); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java index ccf0aeb..44bfbe4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java @@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -143,7 +143,7 @@ class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Bool } @Override - public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException { + public Boolean visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException { return false; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java index b184774..98c717c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java @@ -295,12 +295,17 @@ public class CompiledStatements { private final String datasetName; private final Query query; private final int varCounter; + VariableExpr var; + Query returnQuery; - public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter) { + public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter, + VariableExpr var, Query returnQuery) { this.dataverseName = dataverseName; this.datasetName = datasetName; this.query = query; this.varCounter = varCounter; + this.var = var; + this.returnQuery = returnQuery; } @Override @@ -321,6 +326,14 @@ public class CompiledStatements { return query; } + public VariableExpr getVar() { + return var; + } + + public Query getReturnQuery() { + return returnQuery; + } + @Override public byte getKind() { return Statement.Kind.INSERT; @@ -329,8 +342,9 @@ public class CompiledStatements { public static class CompiledUpsertStatement extends CompiledInsertStatement { - public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter) { - super(dataverseName, datasetName, query, varCounter); + public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter, + VariableExpr var, Query returnQuery) { + super(dataverseName, datasetName, query, varCounter, var, returnQuery); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 1b528b9..149656a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -116,6 +116,8 @@ public interface IStatementExecutor { * @param dmlStatement * The data modification statement when the query results in a modification to a dataset * @return the compiled {@code JobSpecification} + * @param returnQuery + * In the case of dml, the user may run a query on affected data * @throws AsterixException * @throws RemoteException * @throws AlgebricksException @@ -124,7 +126,7 @@ public interface IStatementExecutor { */ JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query, ICompiledDmlStatement dmlStatement) - throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException; + throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException; /** * returns the active dataverse for an entity or a statement http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index 09a0476..9879da8 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -27,11 +27,12 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator; +import org.apache.asterix.algebra.operators.CommitOperator; import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.AsterixException; @@ -40,15 +41,15 @@ import org.apache.asterix.common.functions.FunctionSignature; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.lang.aql.util.RangeMapBuilder; import org.apache.asterix.lang.common.base.Expression; +import org.apache.asterix.lang.common.base.Expression.Kind; import org.apache.asterix.lang.common.base.ILangExpression; import org.apache.asterix.lang.common.base.Statement; -import org.apache.asterix.lang.common.base.Expression.Kind; import org.apache.asterix.lang.common.clause.GroupbyClause; import org.apache.asterix.lang.common.clause.LetClause; import org.apache.asterix.lang.common.clause.LimitClause; import org.apache.asterix.lang.common.clause.OrderbyClause; -import org.apache.asterix.lang.common.clause.WhereClause; import org.apache.asterix.lang.common.clause.OrderbyClause.OrderModifier; +import org.apache.asterix.lang.common.clause.WhereClause; import org.apache.asterix.lang.common.expression.CallExpr; import org.apache.asterix.lang.common.expression.FieldAccessor; import org.apache.asterix.lang.common.expression.FieldBinding; @@ -56,14 +57,14 @@ import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair; import org.apache.asterix.lang.common.expression.IfExpr; import org.apache.asterix.lang.common.expression.IndexAccessor; import org.apache.asterix.lang.common.expression.ListConstructor; +import org.apache.asterix.lang.common.expression.ListConstructor.Type; import org.apache.asterix.lang.common.expression.LiteralExpr; import org.apache.asterix.lang.common.expression.OperatorExpr; import org.apache.asterix.lang.common.expression.QuantifiedExpression; +import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier; import org.apache.asterix.lang.common.expression.RecordConstructor; import org.apache.asterix.lang.common.expression.UnaryExpr; import org.apache.asterix.lang.common.expression.VariableExpr; -import org.apache.asterix.lang.common.expression.ListConstructor.Type; -import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier; import org.apache.asterix.lang.common.literal.StringLiteral; import org.apache.asterix.lang.common.statement.FunctionDecl; import org.apache.asterix.lang.common.statement.Query; @@ -74,13 +75,13 @@ import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; import org.apache.asterix.metadata.declared.AqlMetadataProvider; import org.apache.asterix.metadata.declared.AqlSourceId; import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.metadata.declared.LoadableDataSource; import org.apache.asterix.metadata.declared.ResultSetDataSink; import org.apache.asterix.metadata.declared.ResultSetSinkId; -import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.entities.Function; @@ -96,8 +97,10 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.util.AsterixAppContextInfo; +import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement; +import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.util.FunctionCollection; import org.apache.asterix.translator.util.PlanTranslationUtil; @@ -116,15 +119,15 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation; +import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide; import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation; import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; -import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; -import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide; import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; @@ -133,6 +136,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; @@ -140,13 +144,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; @@ -286,15 +290,30 @@ class LangExpressionToPlanTranslator @Override public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt) throws AlgebricksException { - Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, - new MutableObject<>(new EmptyTupleSourceOperator())); + return translate(expr, outputDatasetName, stmt, null); + } + + public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt, + ILogicalOperator baseOp) throws AlgebricksException { + MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator()); + if (baseOp != null) { + base = new MutableObject<>(baseOp); + } + Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, base); ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>(); ILogicalOperator topOp = p.first; - ProjectOperator project = (ProjectOperator) topOp; - LogicalVariable unnestVar = project.getVariables().get(0); - LogicalVariable resVar = project.getVariables().get(0); if (outputDatasetName == null) { + LogicalVariable resVar; + if (topOp instanceof ProjectOperator) { + resVar = ((ProjectOperator) topOp).getVariables().get(0); + } else if (topOp instanceof AssignOperator) { + resVar = ((AssignOperator) topOp).getVariables().get(0); + } else if (topOp instanceof AggregateOperator) { + resVar = ((AggregateOperator) topOp).getVariables().get(0); + } else { + throw new AlgebricksException("Invalid returning query"); + } FileSplit outputFileSplit = metadataProvider.getOutputFile(); if (outputFileSplit == null) { outputFileSplit = getDefaultOutputFileLocation(); @@ -305,8 +324,9 @@ class LangExpressionToPlanTranslator writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar))); ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId()); ResultSetDataSink sink = new ResultSetDataSink(rssId, null); - topOp = new DistributeResultOperator(writeExprList, sink); - topOp.getInputs().add(new MutableObject<>(project)); + DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink); + newTop.getInputs().add(new MutableObject<>(topOp)); + topOp = newTop; // Retrieve the Output RecordType (if any) and store it on // the DistributeResultOperator @@ -315,6 +335,10 @@ class LangExpressionToPlanTranslator topOp.getAnnotations().put("output-record-type", outputRecordType); } } else { + ProjectOperator project = (ProjectOperator) topOp; + LogicalVariable unnestVar = project.getVariables().get(0); + LogicalVariable resVar = project.getVariables().get(0); + /** * add the collection-to-sequence right before the project, * because dataset only accept non-collection records @@ -380,12 +404,12 @@ class LangExpressionToPlanTranslator switch (stmt.getKind()) { case Statement.Kind.INSERT: leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading, - additionalFilteringExpressions, assign); + additionalFilteringExpressions, assign, stmt); break; case Statement.Kind.UPSERT: leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading, additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, project, exprs, - resVar, additionalFilteringAssign); + resVar, additionalFilteringAssign, stmt); break; case Statement.Kind.DELETE: leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading, @@ -418,7 +442,7 @@ class LangExpressionToPlanTranslator varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false); insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); insertOp.getInputs().add(new MutableObject<>(assign)); - SinkOperator leafOperator = new SinkOperator(); + ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true)); leafOperator.getInputs().add(new MutableObject<>(insertOp)); return leafOperator; } @@ -426,7 +450,7 @@ class LangExpressionToPlanTranslator private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef, List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign) - throws AlgebricksException { + throws AlgebricksException { if (targetDatasource.getDataset().hasMetaPart()) { throw new AlgebricksException(targetDatasource.getDataset().getDatasetName() + ": delete from dataset is not supported on Datasets with Meta records"); @@ -435,7 +459,7 @@ class LangExpressionToPlanTranslator varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false); deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); deleteOp.getInputs().add(new MutableObject<>(assign)); - SinkOperator leafOperator = new SinkOperator(); + ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true)); leafOperator.getInputs().add(new MutableObject<>(deleteOp)); return leafOperator; } @@ -528,7 +552,7 @@ class LangExpressionToPlanTranslator project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign)); } feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - SinkOperator leafOperator = new SinkOperator(); + ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true)); leafOperator.getInputs().add(new MutableObject<>(feedModificationOp)); return leafOperator; } @@ -537,14 +561,20 @@ class LangExpressionToPlanTranslator List<Mutable<ILogicalExpression>> varRefsForLoading, List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign, List<String> additionalFilteringField, LogicalVariable unnestVar, ProjectOperator project, - List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign) - throws AlgebricksException { + List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign, + ICompiledDmlStatement stmt) throws AlgebricksException { if (!targetDatasource.getDataset().allow(project, Dataset.OP_UPSERT)) { throw new AlgebricksException(targetDatasource.getDataset().getDatasetName() + ": upsert into dataset is not supported on Datasets with Meta records"); } + CompiledUpsertStatement compiledUpsert = (CompiledUpsertStatement) stmt; + InsertDeleteUpsertOperator upsertOp; + ILogicalOperator leafOperator; if (targetDatasource.getDataset().hasMetaPart()) { - InsertDeleteUpsertOperator feedModificationOp; + if (compiledUpsert.getReturnQuery() != null) { + throw new AlgebricksException("Returning not allowed on datasets with Meta records"); + + } AssignOperator metaAndKeysAssign; List<LogicalVariable> metaAndKeysVars; List<Mutable<ILogicalExpression>> metaAndKeysExprs; @@ -575,71 +605,113 @@ class LangExpressionToPlanTranslator } } // A change feed, we don't need the assign to access PKs - feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, - metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false); + upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList, + InsertDeleteUpsertOperator.Kind.UPSERT, false); // Create and add a new variable used for representing the original record - feedModificationOp.setPrevRecordVar(context.newVar()); - feedModificationOp.setPrevRecordType(targetDatasource.getItemType()); + upsertOp.setPrevRecordVar(context.newVar()); + upsertOp.setPrevRecordType(targetDatasource.getItemType()); if (targetDatasource.getDataset().hasMetaPart()) { List<LogicalVariable> metaVars = new ArrayList<>(); metaVars.add(context.newVar()); - feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars); + upsertOp.setPrevAdditionalNonFilteringVars(metaVars); List<Object> metaTypes = new ArrayList<>(); metaTypes.add(targetDatasource.getMetaItemType()); - feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes); + upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes); } if (additionalFilteringField != null) { - feedModificationOp.setPrevFilterVar(context.newVar()); - feedModificationOp.setPrevFilterType( + upsertOp.setPrevFilterVar(context.newVar()); + upsertOp.setPrevFilterType( ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0))); additionalFilteringAssign.getInputs().clear(); additionalFilteringAssign.getInputs().add(assign.getInputs().get(0)); - feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign)); + upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign)); } else { - feedModificationOp.getInputs().add(assign.getInputs().get(0)); + upsertOp.getInputs().add(assign.getInputs().get(0)); } metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs); metaAndKeysAssign.getInputs().add(project.getInputs().get(0)); project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign)); - feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - SinkOperator leafOperator = new SinkOperator(); - leafOperator.getInputs().add(new MutableObject<>(feedModificationOp)); - return leafOperator; + upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); + leafOperator = new DelegateOperator(new CommitOperator(true)); + leafOperator.getInputs().add(new MutableObject<>(upsertOp)); + } else { - InsertDeleteUpsertOperator feedModificationOp; - feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, + upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false); - feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - feedModificationOp.getInputs().add(new MutableObject<>(assign)); + upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); + upsertOp.getInputs().add(new MutableObject<>(assign)); // Create and add a new variable used for representing the original record ARecordType recordType = (ARecordType) targetDatasource.getItemType(); - feedModificationOp.setPrevRecordVar(context.newVar()); - feedModificationOp.setPrevRecordType(recordType); + upsertOp.setPrevRecordVar(context.newVar()); + upsertOp.setPrevRecordType(recordType); if (additionalFilteringField != null) { - feedModificationOp.setPrevFilterVar(context.newVar()); - feedModificationOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0))); + upsertOp.setPrevFilterVar(context.newVar()); + upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0))); + } + + if (compiledUpsert.getReturnQuery() != null) { + leafOperator = createReturningQuery(compiledUpsert, upsertOp); + + } else { + leafOperator = new DelegateOperator(new CommitOperator(true)); + leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp)); } - SinkOperator leafOperator = new SinkOperator(); - leafOperator.getInputs().add(new MutableObject<>(feedModificationOp)); - return leafOperator; } + return leafOperator; + + } + + private ILogicalOperator createReturningQuery(CompiledInsertStatement compiledInsert, + InsertDeleteUpsertOperator insertOp) throws AlgebricksException { + //Make the id of the insert var point to the record variable + context.newVar(compiledInsert.getVar()); + context.setVar(compiledInsert.getVar(), + ((VariableReferenceExpression) insertOp.getPayloadExpression().getValue()).getVariableReference()); + // context + + ILogicalPlan planAfterInsert = translate(compiledInsert.getReturnQuery(), null, null, insertOp); + + ILogicalOperator finalRoot = planAfterInsert.getRoots().get(0).getValue(); + ILogicalOperator op; + for (op = finalRoot;; op = op.getInputs().get(0).getValue()) { + if (op.getInputs().size() != 1) { + throw new AlgebricksException("Cannot have a multi-branch returning query"); + } + if (op.getInputs().get(0).getValue() instanceof InsertDeleteUpsertOperator) { + break; + } + } + + op.getInputs().clear(); + ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(false)); + leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp)); + op.getInputs().add(new MutableObject<>(leafOperator)); + leafOperator = finalRoot; + return leafOperator; } private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef, List<Mutable<ILogicalExpression>> varRefsForLoading, - List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign) - throws AlgebricksException { + List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign, + ICompiledDmlStatement stmt) throws AlgebricksException { if (targetDatasource.getDataset().hasMetaPart()) { throw new AlgebricksException(targetDatasource.getDataset().getDatasetName() + ": insert into dataset is not supported on Datasets with Meta records"); } + ILogicalOperator leafOperator; InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false); insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - insertOp.getInputs().add(new MutableObject<>(assign)); - SinkOperator leafOperator = new SinkOperator(); - leafOperator.getInputs().add(new MutableObject<>(insertOp)); + insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign)); + CompiledInsertStatement compiledInsert = (CompiledInsertStatement) stmt; + if (compiledInsert.getReturnQuery() != null) { + leafOperator = createReturningQuery(compiledInsert, insertOp); + + } else { + leafOperator = new DelegateOperator(new CommitOperator(true)); + leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp)); + } return leafOperator; } @@ -880,15 +952,15 @@ class LangExpressionToPlanTranslator gOp.getInputs().add(topOp); for (Entry<Expression, VariableExpr> entry : gc.getWithVarMap().entrySet()) { - Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression( - entry.getKey(), new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp)))); - List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1); + Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(entry.getKey(), + new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp)))); + List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1); flArgs.add(new MutableObject<>(listifyInput.first)); AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions - .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs); + .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs); LogicalVariable aggVar = context.newVar(); AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar), - mkSingletonArrayList(new MutableObject<>(fListify))); + mkSingletonArrayList(new MutableObject<>(fListify))); agg.getInputs().add(listifyInput.second); @@ -945,8 +1017,8 @@ class LangExpressionToPlanTranslator LogicalVariable unnestVar = context.newVar(); UnnestOperator unnestOp = new UnnestOperator(unnestVar, new MutableObject<>(new UnnestingFunctionCallExpression( - FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections - .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar)))))); + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), + Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar)))))); unnestOp.getInputs().add(new MutableObject<>(assignOp)); // Produces the final result. @@ -1514,7 +1586,7 @@ class LangExpressionToPlanTranslator // There is a shared operator reference in the query plan. // Deep copies the child plan. LogicalOperatorDeepCopyWithNewVariablesVisitor visitor = - new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null); + new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null); ILogicalOperator newChild = childRef.getValue().accept(visitor, null); LinkedHashMap<LogicalVariable, LogicalVariable> cloneVarMap = visitor .getInputToOutputVariableMapping(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java index 78c68e1..6c8019d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java @@ -25,21 +25,14 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; -import java.util.Collection; import javax.imageio.ImageIO; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.watch.FeedActivity; -import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails; - public class FeedServlet extends HttpServlet { private static final long serialVersionUID = 1L; - private static final String FEED_EXTENSION_NAME = "Feed"; @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { @@ -89,49 +82,4 @@ public class FeedServlet extends HttpServlet { PrintWriter out = response.getWriter(); out.println(outStr); } - - @SuppressWarnings("unused") - private void insertTable(StringBuilder html, Collection<FeedActivity> list) { - } - - @SuppressWarnings("null") - private void insertRow(StringBuilder html, FeedActivity activity) { - String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS); - String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS); - String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS); - - FeedConnectionId connectionId = new FeedConnectionId( - new EntityId(FEED_EXTENSION_NAME, activity.getDataverseName(), activity.getFeedName()), - activity.getDatasetName()); - int intakeRate = 0; - int storeRate = 0; - - html.append("<tr>"); - html.append("<td>" + activity.getFeedName() + "</td>"); - html.append("<td>" + activity.getDatasetName() + "</td>"); - html.append("<td>" + activity.getConnectTimestamp() + "</td>"); - //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>"); - html.append("<td>" + intake + "</td>"); - html.append("<td>" + compute + "</td>"); - html.append("<td>" + store + "</td>"); - String color = "black"; - if (intakeRate > storeRate) { - color = "red"; - } - if (intakeRate < 0) { - html.append("<td>" + "UNKNOWN" + "</td>"); - } else { - html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>"); - } - if (storeRate < 0) { - html.append("<td>" + "UNKNOWN" + "</td>"); - } else { - html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>"); - } - html.append("</tr>"); - } - - private String insertColoredText(String s, String color) { - return "<font color=\"" + color + "\">" + s + "</font>"; - } }