Change logical plan to apply filter from 2ndary index - Changes the IntroduceLSMComponentFilterRule to replace the constant filter value from the query to the value carried from 2ndary index search. - Can use 2ndary index filter even the query doens't contain any filter related condition.
Change-Id: I0e2fe0208662e5dcd49d1a22bfb58f96533e9497 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1727 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/de0ece7f Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/de0ece7f Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/de0ece7f Branch: refs/heads/master Commit: de0ece7f151cd26cf3bf455fd0aa95df51f35d55 Parents: 8cbb05c Author: Jianfeng Jia <[email protected]> Authored: Tue Jun 6 22:43:26 2017 -0700 Committer: Jianfeng Jia <[email protected]> Committed: Wed Jun 7 11:41:56 2017 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../physical/BTreeSearchPOperator.java | 3 +- .../physical/IndexSearchPOperator.java | 2 +- .../physical/InvertedIndexPOperator.java | 27 +- .../physical/RTreeSearchPOperator.java | 10 +- .../am/IntroduceLSMComponentFilterRule.java | 195 +- asterixdb/asterix-app/data/twitter/real.2.adm | 5000 ++++++++++++++++++ asterixdb/asterix-app/data/twitter/real.adm | 5000 ++++++++++++++++++ .../btree-btree-search-wo-query-filter.aql | 38 + .../queries/filter/btree-btree-search.aql | 42 + .../inverted-btree-search-wo-query-filter.aql | 40 + .../queries/filter/inverted-btree-search.aql | 43 + ...multi-index-btree-search-wo-query-filter.aql | 45 + .../queries/filter/multi-index-btree-search.aql | 48 + .../rtree-btree-search-wo-query-filter.aql | 40 + .../queries/filter/rtree-btree-search.aql | 43 + .../btree-btree-search-wo-query-filter.plan | 15 + .../results/filter/btree-btree-search.plan | 17 + .../inverted-btree-search-wo-query-filter.plan | 13 + .../results/filter/inverted-btree-search.plan | 15 + ...ulti-index-btree-search-wo-query-filter.plan | 33 + .../filter/multi-index-btree-search.plan | 35 + .../rtree-btree-search-wo-query-filter.plan | 15 + .../results/filter/rtree-btree-search.plan | 17 + ...btree-rtree-ngram-intersect-with-filter.plan | 36 +- .../queries/filters/delete/delete.1.ddl.aql | 96 + .../queries/filters/delete/delete.10.query.aql | 27 + .../queries/filters/delete/delete.2.update.aql | 25 + .../queries/filters/delete/delete.3.server.aql | 26 + .../queries/filters/delete/delete.4.sleep.aql | 22 + .../queries/filters/delete/delete.5.server.aql | 22 + .../queries/filters/delete/delete.6.update.aql | 22 + .../queries/filters/delete/delete.7.server.aql | 23 + .../queries/filters/delete/delete.8.sleep.aql | 20 + .../queries/filters/delete/delete.9.server.aql | 20 + .../queries/filters/upsert/upsert.1.ddl.aql | 96 + .../queries/filters/upsert/upsert.10.query.aql | 26 + .../queries/filters/upsert/upsert.11.ddl.aql | 20 + .../queries/filters/upsert/upsert.2.update.aql | 27 + .../queries/filters/upsert/upsert.3.server.aql | 26 + .../queries/filters/upsert/upsert.4.sleep.aql | 22 + .../queries/filters/upsert/upsert.5.server.aql | 22 + .../queries/filters/upsert/upsert.6.update.aql | 55 + .../queries/filters/upsert/upsert.7.server.aql | 23 + .../queries/filters/upsert/upsert.8.sleep.aql | 20 + .../queries/filters/upsert/upsert.9.server.aql | 20 + .../tinysocial-intersect.1.ddl.aql | 46 + .../tinysocial-intersect.2.update.aql | 23 + .../tinysocial-intersect.3.query.aql | 28 + .../filters/delete/delete.1.ddl.sqlpp | 96 + .../filters/delete/delete.10.query.sqlpp | 23 + .../filters/delete/delete.11.ddl.sqlpp | 27 + .../filters/delete/delete.2.update.sqlpp | 24 + .../filters/delete/delete.3.server.sqlpp | 26 + .../filters/delete/delete.4.sleep.sqlpp | 20 + .../filters/delete/delete.5.server.sqlpp | 20 + .../filters/delete/delete.6.update.sqlpp | 22 + .../filters/delete/delete.7.server.sqlpp | 23 + .../filters/delete/delete.8.sleep.sqlpp | 20 + .../filters/delete/delete.9.server.sqlpp | 20 + .../filters/upsert/upsert.1.ddl.sqlpp | 97 + .../filters/upsert/upsert.10.query.sqlpp | 23 + .../filters/upsert/upsert.11.ddl.sqlpp | 27 + .../filters/upsert/upsert.2.update.sqlpp | 26 + .../filters/upsert/upsert.3.server.sqlpp | 26 + .../filters/upsert/upsert.4.sleep.sqlpp | 20 + .../filters/upsert/upsert.5.server.sqlpp | 20 + .../filters/upsert/upsert.6.update.sqlpp | 53 + .../filters/upsert/upsert.7.server.sqlpp | 23 + .../filters/upsert/upsert.8.sleep.sqlpp | 20 + .../filters/upsert/upsert.9.server.sqlpp | 20 + .../results/filters/delete/delete.1.adm | 0 .../results/filters/upsert/upsert.1.adm | 1 + .../intersection-with-filter/intersection.1.adm | 1 + .../src/test/resources/runtimets/testsuite.xml | 15 + .../resources/runtimets/testsuite_sqlpp.xml | 10 + .../metadata/declared/DatasetDataSource.java | 2 +- .../metadata/declared/MetadataProvider.java | 11 +- .../operators/logical/AbstractScanOperator.java | 5 + .../logical/AbstractUnnestMapOperator.java | 35 + .../operators/logical/IntersectOperator.java | 73 +- .../operators/logical/UnnestMapOperator.java | 2 +- .../operators/physical/IntersectPOperator.java | 35 +- .../intersect/IntersectOperatorDescriptor.java | 91 +- .../unit/IntersectOperatorDescriptorTest.java | 4 +- 85 files changed, 12367 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 295d874..569eb3d 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ dist *.swp .m2* Ã + http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java index 9dd57d5..2fd9079 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java @@ -115,6 +115,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator { int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas); int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas); + boolean propagateFilter = unnestMap.propagateIndexFilter(); MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); Dataset dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName()); @@ -124,7 +125,7 @@ public class BTreeSearchPOperator extends IndexSearchPOperator { Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = metadataProvider.buildBtreeRuntime( builder.getJobSpec(), opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainMissing, dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(), - jobGenParams.isHighKeyInclusive(), minFilterFieldIndexes, maxFilterFieldIndexes); + jobGenParams.isHighKeyInclusive(), propagateFilter, minFilterFieldIndexes, maxFilterFieldIndexes); builder.contributeHyracksOperator(unnestMap, btreeSearch.first); builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java index ce43480..9f46e6a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java @@ -63,7 +63,7 @@ public abstract class IndexSearchPOperator extends AbstractScanPOperator { IDataSource<?> ds = idx.getDataSource(); IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider(); AbstractScanOperator as = (AbstractScanOperator) op; - deliveredProperties = dspp.computePropertiesVector(as.getVariables()); + deliveredProperties = dspp.computePropertiesVector(as.getScanVariables()); } protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index 50c762e..213c60b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -135,13 +135,16 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { AbstractUnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput, boolean retainMissing, String datasetName, Dataset dataset, String indexName, ATypeTag searchKeyType, int[] keyFields, SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold, - int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean isFullTextSearchQuery) - throws AlgebricksException { + int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, + boolean isFullTextSearchQuery) throws AlgebricksException { try { + + boolean propagateIndexFilter = unnestMap.propagateIndexFilter(); IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject(); int numPrimaryKeys = dataset.getPrimaryKeys().size(); - Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), - dataset.getDataverseName(), dataset.getDatasetName(), indexName); + Index secondaryIndex = MetadataManager.INSTANCE + .getIndex(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(), + dataset.getDatasetName(), indexName); if (secondaryIndex == null) { throw new AlgebricksException( "Code generation error: no index " + indexName + " for dataset " + datasetName); @@ -160,13 +163,15 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first); - LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor( - jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, - searchModifierFactory, retainInput, retainMissing, context.getMissingWriterFactory(), - dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex, - ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(), - IndexOperation.SEARCH, null), - minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys, false); + LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = + new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, + dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), + dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), + secondaryIndex, + ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(), + IndexOperation.SEARCH, null), minFilterFieldIndexes, maxFilterFieldIndexes, + isFullTextSearchQuery, numPrimaryKeys, propagateIndexFilter); return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second); } catch (MetadataException e) { throw new AlgebricksException(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java index f9d4c80..733e62f 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java @@ -66,7 +66,7 @@ public class RTreeSearchPOperator extends IndexSearchPOperator { @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { AbstractUnnestMapOperator unnestMap = (AbstractUnnestMapOperator) op; ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue(); if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { @@ -81,6 +81,7 @@ public class RTreeSearchPOperator extends IndexSearchPOperator { jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments()); int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas); + boolean propagateIndexFilter = unnestMap.propagateIndexFilter(); int[] minFilterFieldIndexes = getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas); int[] maxFilterFieldIndexes = getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas); @@ -97,9 +98,10 @@ public class RTreeSearchPOperator extends IndexSearchPOperator { // By nature, LEFT_OUTER_UNNEST_MAP should generate null values for non-matching tuples. retainNull = true; } - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = mp.buildRtreeRuntime( - builder.getJobSpec(), outputVars, opSchema, typeEnv, context, jobGenParams.getRetainInput(), retainNull, - dataset, jobGenParams.getIndexName(), keyIndexes, minFilterFieldIndexes, maxFilterFieldIndexes); + Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = + mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, opSchema, typeEnv, context, + jobGenParams.getRetainInput(), retainNull, dataset, jobGenParams.getIndexName(), keyIndexes, + propagateIndexFilter, minFilterFieldIndexes, maxFilterFieldIndexes); builder.contributeHyracksOperator(unnestMap, rtreeSearch.first); builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java index 83b277d..95f0de9 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java @@ -19,11 +19,16 @@ package org.apache.asterix.optimizer.rules.am; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.logging.Logger; import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.metadata.declared.DataSource; import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -56,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; @@ -63,6 +69,8 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { + static final Logger LOGGER = Logger.getLogger(IntroduceLSMComponentFilterRule.class.getName()); + protected IVariableTypeEnvironment typeEnvironment = null; @Override @@ -80,12 +88,6 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { } AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); - typeEnvironment = context.getOutputTypeEnvironment(op); - ILogicalExpression condExpr = ((SelectOperator) op).getCondition().getValue(); - AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, context, typeEnvironment); - if (analysisCtx.getMatchedFuncExprs().isEmpty()) { - return false; - } Dataset dataset = getDataset(op, context); List<String> filterFieldName = null; @@ -101,22 +103,33 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { if (filterFieldName == null || recType == null) { return false; } - List<Index> datasetIndexes = ((MetadataProvider) context.getMetadataProvider()) - .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()); + + IAType filterType = recType.getSubFieldType(filterFieldName); + + typeEnvironment = context.getOutputTypeEnvironment(op); + ILogicalExpression condExpr = ((SelectOperator) op).getCondition().getValue(); + AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, context, typeEnvironment); List<IOptimizableFuncExpr> optFuncExprs = new ArrayList<>(); - for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) { - IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(i); - boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, recType, datasetIndexes, context); - if (found && optFuncExpr.getFieldName(0).equals(filterFieldName)) { - optFuncExprs.add(optFuncExpr); + if (!analysisCtx.getMatchedFuncExprs().isEmpty()) { + List<Index> datasetIndexes = ((MetadataProvider) context.getMetadataProvider()) + .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()); + + for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) { + IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(i); + boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, recType, datasetIndexes, context); + if (found && optFuncExpr.getFieldName(0).equals(filterFieldName)) { + optFuncExprs.add(optFuncExpr); + } } } + if (optFuncExprs.isEmpty()) { - return false; + assignFilterFromSecondaryUnnestMap(op, dataset, context, filterType); + } else { + assignFilterFromQuery(optFuncExprs, op, dataset, context, filterType); } - changePlan(optFuncExprs, op, dataset, context); OperatorPropertiesUtil.typeOpRec(opRef, context); context.addToDontApplySet(this, op); @@ -147,9 +160,11 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { return new AssignOperator(assignKeyVarList, assignKeyExprList); } - private void changePlan(List<IOptimizableFuncExpr> optFuncExprs, AbstractLogicalOperator op, Dataset dataset, - IOptimizationContext context) throws AlgebricksException { + private void assignFilterFromQuery(List<IOptimizableFuncExpr> optFuncExprs, AbstractLogicalOperator op, + Dataset dataset, IOptimizationContext context, IAType filterType) throws AlgebricksException { + List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>(); + boolean hasSecondaryIndexMap = false; Queue<Mutable<ILogicalOperator>> queue = new LinkedList<>(op.getInputs()); while (!queue.isEmpty()) { AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) queue.poll().getValue(); @@ -176,8 +191,7 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { dataSourceScanOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - assignOp.getInputs() - .add(new MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue())); + assignOp.getInputs().add(new MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue())); dataSourceScanOp.getInputs().get(0).setValue(assignOp); } } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) { @@ -207,14 +221,153 @@ public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule { .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var))); } unnestMapOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - assignOp.getInputs() - .add(new MutableObject<>(unnestMapOp.getInputs().get(0).getValue())); + assignOp.getInputs().add(new MutableObject<>(unnestMapOp.getInputs().get(0).getValue())); unnestMapOp.getInputs().get(0).setValue(assignOp); + + if (jobGenParams.isPrimaryIndex) { + primaryUnnestMapOps.add(unnestMapOp); + } else { + hasSecondaryIndexMap = true; + } } } } queue.addAll(descendantOp.getInputs()); } + if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) { + propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, context); + } + } + + private void propagateFilterToPrimaryIndex(List<UnnestMapOperator> primaryUnnestMapOps, IAType filterType, + IOptimizationContext context) throws AlgebricksException { + for (UnnestMapOperator primaryOp : primaryUnnestMapOps) { + Mutable<ILogicalOperator> assignOrOrderOrIntersect = primaryOp.getInputs().get(0); + Mutable<ILogicalOperator> intersectOrSort = assignOrOrderOrIntersect; + + if (assignOrOrderOrIntersect.getValue().getOperatorTag() == LogicalOperatorTag.ASSIGN) { + intersectOrSort = assignOrOrderOrIntersect.getValue().getInputs().get(0); + } + + switch (intersectOrSort.getValue().getOperatorTag()) { + case INTERSECT: + IntersectOperator intersect = (IntersectOperator) (intersectOrSort.getValue()); + List<List<LogicalVariable>> filterVars = new ArrayList<>(intersect.getInputs().size()); + for (Mutable<ILogicalOperator> mutableOp : intersect.getInputs()) { + ILogicalOperator child = mutableOp.getValue(); + while (!child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP)) { + child = child.getInputs().get(0).getValue(); + } + UnnestMapOperator unnestMap = (UnnestMapOperator) child; + propagateFilterInSecondaryUnnsetMap(unnestMap, filterType, context); + + List<LogicalVariable> extraVars = Arrays.asList(unnestMap.getPropagateIndexMinFilterVar(), + unnestMap.getPropagateIndexMaxFilterVar()); + filterVars.add(extraVars); + } + if (!filterVars.isEmpty()) { + List<LogicalVariable> outputFilterVars = new ArrayList<>(filterVars.get(0)); + IntersectOperator intersectWithFilter = + createIntersectWithFilter(outputFilterVars, filterVars, intersect); + + intersectOrSort.setValue(intersectWithFilter); + context.computeAndSetTypeEnvironmentForOperator(intersectWithFilter); + setPrimaryFilterVar(primaryOp, outputFilterVars.get(0), outputFilterVars.get(1), context); + } + break; + case ORDER: + ILogicalOperator child = intersectOrSort.getValue().getInputs().get(0).getValue(); + if (child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP)) { + UnnestMapOperator secondaryMap = (UnnestMapOperator) child; + + propagateFilterInSecondaryUnnsetMap(secondaryMap, filterType, context); + + setPrimaryFilterVar(primaryOp, secondaryMap.getPropagateIndexMinFilterVar(), + secondaryMap.getPropagateIndexMaxFilterVar(), context); + } + break; + default: + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, + intersectOrSort.getValue().getOperatorTag().toString()); + } + } + } + + private IntersectOperator createIntersectWithFilter(List<LogicalVariable> outputFilterVars, + List<List<LogicalVariable>> filterVars, IntersectOperator intersect) throws AlgebricksException { + List<LogicalVariable> outputVars = new ArrayList<>(); + outputVars.addAll(intersect.getOutputVars()); + outputVars.addAll(outputFilterVars); + + List<List<LogicalVariable>> compareVars = new ArrayList<>(intersect.getNumInput()); + for (int i = 0; i < intersect.getNumInput(); i++) { + compareVars.add(new ArrayList<>(intersect.getCompareVariables(i))); + } + + IntersectOperator intersectWithFilter = new IntersectOperator(outputVars, compareVars, filterVars); + intersectWithFilter.getInputs().addAll(intersect.getInputs()); + return intersectWithFilter; + } + + private void propagateFilterInSecondaryUnnsetMap(UnnestMapOperator secondaryUnnest, IAType filterType, + IOptimizationContext context) throws AlgebricksException { + + LogicalVariable minIndexFilterVar = context.newVar(); + LogicalVariable maxIndexFilterVar = context.newVar(); + secondaryUnnest.markPropagageIndexFilter(); + secondaryUnnest.getVariables().add(minIndexFilterVar); + secondaryUnnest.getVariableTypes().add(filterType); + secondaryUnnest.getVariables().add(maxIndexFilterVar); + secondaryUnnest.getVariableTypes().add(filterType); + + context.computeAndSetTypeEnvironmentForOperator(secondaryUnnest); + } + + private void setPrimaryFilterVar(UnnestMapOperator primaryOp, LogicalVariable minFilterVar, + LogicalVariable maxFilterVar, IOptimizationContext context) throws AlgebricksException { + primaryOp.setMinFilterVars(Collections.singletonList(minFilterVar)); + primaryOp.setMaxFilterVars(Collections.singletonList(maxFilterVar)); + + List<Mutable<ILogicalExpression>> indexFilterExpression = + Arrays.asList(new MutableObject<>(new VariableReferenceExpression(minFilterVar)), + new MutableObject<>(new VariableReferenceExpression(maxFilterVar))); + + primaryOp.setAdditionalFilteringExpressions(indexFilterExpression); + context.computeAndSetTypeEnvironmentForOperator(primaryOp); + } + + private void assignFilterFromSecondaryUnnestMap(AbstractLogicalOperator op, Dataset dataset, + IOptimizationContext context, IAType filterType) throws AlgebricksException { + List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>(); + boolean hasSecondaryIndexMap = false; + Queue<Mutable<ILogicalOperator>> queue = new LinkedList<>(op.getInputs()); + while (!queue.isEmpty()) { + ILogicalOperator descendantOp = queue.poll().getValue(); + if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) { + UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp; + ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue(); + if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { + AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr; + FunctionIdentifier fid = f.getFunctionIdentifier(); + if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, fid.getName()); + } + AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams(); + jobGenParams.readFromFuncArgs(f.getArguments()); + if (dataset.getDatasetName().compareTo(jobGenParams.datasetName) == 0) { + if (jobGenParams.isPrimaryIndex) { + primaryUnnestMapOps.add(unnestMapOp); + } else { + hasSecondaryIndexMap = true; + } + } + } + } + queue.addAll(descendantOp.getInputs()); + } + if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) { + propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, context); + } } private Dataset getDataset(AbstractLogicalOperator op, IOptimizationContext context) throws AlgebricksException {
