Change logical plan to apply filter from 2ndary index

- Changes the IntroduceLSMComponentFilterRule to
replace the constant filter value from the query to the value
carried from 2ndary index search.
- Can use 2ndary index filter even the query doens't contain
any filter related condition.

Change-Id: I0e2fe0208662e5dcd49d1a22bfb58f96533e9497
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1727
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
BAD: Jenkins <[email protected]>
Reviewed-by: Yingyi Bu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/de0ece7f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/de0ece7f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/de0ece7f

Branch: refs/heads/master
Commit: de0ece7f151cd26cf3bf455fd0aa95df51f35d55
Parents: 8cbb05c
Author: Jianfeng Jia <[email protected]>
Authored: Tue Jun 6 22:43:26 2017 -0700
Committer: Jianfeng Jia <[email protected]>
Committed: Wed Jun 7 11:41:56 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 .../physical/BTreeSearchPOperator.java          |    3 +-
 .../physical/IndexSearchPOperator.java          |    2 +-
 .../physical/InvertedIndexPOperator.java        |   27 +-
 .../physical/RTreeSearchPOperator.java          |   10 +-
 .../am/IntroduceLSMComponentFilterRule.java     |  195 +-
 asterixdb/asterix-app/data/twitter/real.2.adm   | 5000 ++++++++++++++++++
 asterixdb/asterix-app/data/twitter/real.adm     | 5000 ++++++++++++++++++
 .../btree-btree-search-wo-query-filter.aql      |   38 +
 .../queries/filter/btree-btree-search.aql       |   42 +
 .../inverted-btree-search-wo-query-filter.aql   |   40 +
 .../queries/filter/inverted-btree-search.aql    |   43 +
 ...multi-index-btree-search-wo-query-filter.aql |   45 +
 .../queries/filter/multi-index-btree-search.aql |   48 +
 .../rtree-btree-search-wo-query-filter.aql      |   40 +
 .../queries/filter/rtree-btree-search.aql       |   43 +
 .../btree-btree-search-wo-query-filter.plan     |   15 +
 .../results/filter/btree-btree-search.plan      |   17 +
 .../inverted-btree-search-wo-query-filter.plan  |   13 +
 .../results/filter/inverted-btree-search.plan   |   15 +
 ...ulti-index-btree-search-wo-query-filter.plan |   33 +
 .../filter/multi-index-btree-search.plan        |   35 +
 .../rtree-btree-search-wo-query-filter.plan     |   15 +
 .../results/filter/rtree-btree-search.plan      |   17 +
 ...btree-rtree-ngram-intersect-with-filter.plan |   36 +-
 .../queries/filters/delete/delete.1.ddl.aql     |   96 +
 .../queries/filters/delete/delete.10.query.aql  |   27 +
 .../queries/filters/delete/delete.2.update.aql  |   25 +
 .../queries/filters/delete/delete.3.server.aql  |   26 +
 .../queries/filters/delete/delete.4.sleep.aql   |   22 +
 .../queries/filters/delete/delete.5.server.aql  |   22 +
 .../queries/filters/delete/delete.6.update.aql  |   22 +
 .../queries/filters/delete/delete.7.server.aql  |   23 +
 .../queries/filters/delete/delete.8.sleep.aql   |   20 +
 .../queries/filters/delete/delete.9.server.aql  |   20 +
 .../queries/filters/upsert/upsert.1.ddl.aql     |   96 +
 .../queries/filters/upsert/upsert.10.query.aql  |   26 +
 .../queries/filters/upsert/upsert.11.ddl.aql    |   20 +
 .../queries/filters/upsert/upsert.2.update.aql  |   27 +
 .../queries/filters/upsert/upsert.3.server.aql  |   26 +
 .../queries/filters/upsert/upsert.4.sleep.aql   |   22 +
 .../queries/filters/upsert/upsert.5.server.aql  |   22 +
 .../queries/filters/upsert/upsert.6.update.aql  |   55 +
 .../queries/filters/upsert/upsert.7.server.aql  |   23 +
 .../queries/filters/upsert/upsert.8.sleep.aql   |   20 +
 .../queries/filters/upsert/upsert.9.server.aql  |   20 +
 .../tinysocial-intersect.1.ddl.aql              |   46 +
 .../tinysocial-intersect.2.update.aql           |   23 +
 .../tinysocial-intersect.3.query.aql            |   28 +
 .../filters/delete/delete.1.ddl.sqlpp           |   96 +
 .../filters/delete/delete.10.query.sqlpp        |   23 +
 .../filters/delete/delete.11.ddl.sqlpp          |   27 +
 .../filters/delete/delete.2.update.sqlpp        |   24 +
 .../filters/delete/delete.3.server.sqlpp        |   26 +
 .../filters/delete/delete.4.sleep.sqlpp         |   20 +
 .../filters/delete/delete.5.server.sqlpp        |   20 +
 .../filters/delete/delete.6.update.sqlpp        |   22 +
 .../filters/delete/delete.7.server.sqlpp        |   23 +
 .../filters/delete/delete.8.sleep.sqlpp         |   20 +
 .../filters/delete/delete.9.server.sqlpp        |   20 +
 .../filters/upsert/upsert.1.ddl.sqlpp           |   97 +
 .../filters/upsert/upsert.10.query.sqlpp        |   23 +
 .../filters/upsert/upsert.11.ddl.sqlpp          |   27 +
 .../filters/upsert/upsert.2.update.sqlpp        |   26 +
 .../filters/upsert/upsert.3.server.sqlpp        |   26 +
 .../filters/upsert/upsert.4.sleep.sqlpp         |   20 +
 .../filters/upsert/upsert.5.server.sqlpp        |   20 +
 .../filters/upsert/upsert.6.update.sqlpp        |   53 +
 .../filters/upsert/upsert.7.server.sqlpp        |   23 +
 .../filters/upsert/upsert.8.sleep.sqlpp         |   20 +
 .../filters/upsert/upsert.9.server.sqlpp        |   20 +
 .../results/filters/delete/delete.1.adm         |    0
 .../results/filters/upsert/upsert.1.adm         |    1 +
 .../intersection-with-filter/intersection.1.adm |    1 +
 .../src/test/resources/runtimets/testsuite.xml  |   15 +
 .../resources/runtimets/testsuite_sqlpp.xml     |   10 +
 .../metadata/declared/DatasetDataSource.java    |    2 +-
 .../metadata/declared/MetadataProvider.java     |   11 +-
 .../operators/logical/AbstractScanOperator.java |    5 +
 .../logical/AbstractUnnestMapOperator.java      |   35 +
 .../operators/logical/IntersectOperator.java    |   73 +-
 .../operators/logical/UnnestMapOperator.java    |    2 +-
 .../operators/physical/IntersectPOperator.java  |   35 +-
 .../intersect/IntersectOperatorDescriptor.java  |   91 +-
 .../unit/IntersectOperatorDescriptorTest.java   |    4 +-
 85 files changed, 12367 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 295d874..569eb3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,4 @@ dist
 *.swp
 .m2*
 ß
+

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 9dd57d5..2fd9079 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -115,6 +115,7 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
 
         int[] minFilterFieldIndexes = 
getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
         int[] maxFilterFieldIndexes = 
getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
+        boolean propagateFilter = unnestMap.propagateIndexFilter();
 
         MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
         Dataset dataset = 
metadataProvider.findDataset(jobGenParams.getDataverseName(), 
jobGenParams.getDatasetName());
@@ -124,7 +125,7 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = 
metadataProvider.buildBtreeRuntime(
                 builder.getJobSpec(), opSchema, typeEnv, context, 
jobGenParams.getRetainInput(), retainMissing,
                 dataset, jobGenParams.getIndexName(), lowKeyIndexes, 
highKeyIndexes, jobGenParams.isLowKeyInclusive(),
-                jobGenParams.isHighKeyInclusive(), minFilterFieldIndexes, 
maxFilterFieldIndexes);
+                jobGenParams.isHighKeyInclusive(), propagateFilter, 
minFilterFieldIndexes, maxFilterFieldIndexes);
 
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, 
btreeSearch.second);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
index ce43480..9f46e6a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -63,7 +63,7 @@ public abstract class IndexSearchPOperator extends 
AbstractScanPOperator {
         IDataSource<?> ds = idx.getDataSource();
         IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
         AbstractScanOperator as = (AbstractScanOperator) op;
-        deliveredProperties = dspp.computePropertiesVector(as.getVariables());
+        deliveredProperties = 
dspp.computePropertiesVector(as.getScanVariables());
     }
 
     protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, 
IOperatorSchema[] inputSchemas) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 50c762e..213c60b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -135,13 +135,16 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
             AbstractUnnestMapOperator unnestMap, IOperatorSchema opSchema, 
boolean retainInput, boolean retainMissing,
             String datasetName, Dataset dataset, String indexName, ATypeTag 
searchKeyType, int[] keyFields,
             SearchModifierType searchModifierType, IAlgebricksConstantValue 
similarityThreshold,
-            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean 
isFullTextSearchQuery)
-            throws AlgebricksException {
+            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
+            boolean isFullTextSearchQuery) throws AlgebricksException {
         try {
+
+            boolean propagateIndexFilter = unnestMap.propagateIndexFilter();
             IAObject simThresh = ((AsterixConstantValue) 
similarityThreshold).getObject();
             int numPrimaryKeys = dataset.getPrimaryKeys().size();
-            Index secondaryIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                    dataset.getDataverseName(), dataset.getDatasetName(), 
indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE
+                    .getIndex(metadataProvider.getMetadataTxnContext(), 
dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
                 throw new AlgebricksException(
                         "Code generation error: no index " + indexName + " for 
dataset " + datasetName);
@@ -160,13 +163,15 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
             IIndexDataflowHelperFactory dataflowHelperFactory =
                     new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                             secondarySplitsAndConstraint.first);
-            LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new 
LSMInvertedIndexSearchOperatorDescriptor(
-                    jobSpec, outputRecDesc, queryField, dataflowHelperFactory, 
queryTokenizerFactory,
-                    searchModifierFactory, retainInput, retainMissing, 
context.getMissingWriterFactory(),
-                    
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
 secondaryIndex,
-                            ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId(),
-                            IndexOperation.SEARCH, null),
-                    minFilterFieldIndexes, maxFilterFieldIndexes, 
isFullTextSearchQuery, numPrimaryKeys, false);
+            LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+                    new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, 
outputRecDesc, queryField,
+                            dataflowHelperFactory, queryTokenizerFactory, 
searchModifierFactory, retainInput,
+                            retainMissing, context.getMissingWriterFactory(),
+                            
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
+                                    secondaryIndex,
+                                    ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId(),
+                                    IndexOperation.SEARCH, null), 
minFilterFieldIndexes, maxFilterFieldIndexes,
+                            isFullTextSearchQuery, numPrimaryKeys, 
propagateIndexFilter);
             return new Pair<>(invIndexSearchOp, 
secondarySplitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index f9d4c80..733e62f 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -66,7 +66,7 @@ public class RTreeSearchPOperator extends 
IndexSearchPOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         AbstractUnnestMapOperator unnestMap = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = 
unnestMap.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
@@ -81,6 +81,7 @@ public class RTreeSearchPOperator extends 
IndexSearchPOperator {
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), 
inputSchemas);
 
+        boolean propagateIndexFilter = unnestMap.propagateIndexFilter();
         int[] minFilterFieldIndexes = 
getKeyIndexes(unnestMap.getMinFilterVars(), inputSchemas);
         int[] maxFilterFieldIndexes = 
getKeyIndexes(unnestMap.getMaxFilterVars(), inputSchemas);
 
@@ -97,9 +98,10 @@ public class RTreeSearchPOperator extends 
IndexSearchPOperator {
             // By nature, LEFT_OUTER_UNNEST_MAP should generate null values 
for non-matching tuples.
             retainNull = true;
         }
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = 
mp.buildRtreeRuntime(
-                builder.getJobSpec(), outputVars, opSchema, typeEnv, context, 
jobGenParams.getRetainInput(), retainNull,
-                dataset, jobGenParams.getIndexName(), keyIndexes, 
minFilterFieldIndexes, maxFilterFieldIndexes);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
+                mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, 
opSchema, typeEnv, context,
+                        jobGenParams.getRetainInput(), retainNull, dataset, 
jobGenParams.getIndexName(), keyIndexes,
+                        propagateIndexFilter, minFilterFieldIndexes, 
maxFilterFieldIndexes);
 
         builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, 
rtreeSearch.second);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/de0ece7f/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index 83b277d..95f0de9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -19,11 +19,16 @@
 package org.apache.asterix.optimizer.rules.am;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -56,6 +61,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -63,6 +69,8 @@ import 
org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class IntroduceLSMComponentFilterRule implements IAlgebraicRewriteRule {
 
+    static final Logger LOGGER = 
Logger.getLogger(IntroduceLSMComponentFilterRule.class.getName());
+
     protected IVariableTypeEnvironment typeEnvironment = null;
 
     @Override
@@ -80,12 +88,6 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
         }
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
-        typeEnvironment = context.getOutputTypeEnvironment(op);
-        ILogicalExpression condExpr = ((SelectOperator) 
op).getCondition().getValue();
-        AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, 
context, typeEnvironment);
-        if (analysisCtx.getMatchedFuncExprs().isEmpty()) {
-            return false;
-        }
 
         Dataset dataset = getDataset(op, context);
         List<String> filterFieldName = null;
@@ -101,22 +103,33 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
         if (filterFieldName == null || recType == null) {
             return false;
         }
-        List<Index> datasetIndexes = ((MetadataProvider) 
context.getMetadataProvider())
-                .getDatasetIndexes(dataset.getDataverseName(), 
dataset.getDatasetName());
+
+        IAType filterType = recType.getSubFieldType(filterFieldName);
+
+        typeEnvironment = context.getOutputTypeEnvironment(op);
+        ILogicalExpression condExpr = ((SelectOperator) 
op).getCondition().getValue();
+        AccessMethodAnalysisContext analysisCtx = analyzeCondition(condExpr, 
context, typeEnvironment);
 
         List<IOptimizableFuncExpr> optFuncExprs = new ArrayList<>();
 
-        for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) {
-            IOptimizableFuncExpr optFuncExpr = 
analysisCtx.getMatchedFuncExpr(i);
-            boolean found = findMacthedExprFieldName(optFuncExpr, op, dataset, 
recType, datasetIndexes, context);
-            if (found && optFuncExpr.getFieldName(0).equals(filterFieldName)) {
-                optFuncExprs.add(optFuncExpr);
+        if (!analysisCtx.getMatchedFuncExprs().isEmpty()) {
+            List<Index> datasetIndexes = ((MetadataProvider) 
context.getMetadataProvider())
+                    .getDatasetIndexes(dataset.getDataverseName(), 
dataset.getDatasetName());
+
+            for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) 
{
+                IOptimizableFuncExpr optFuncExpr = 
analysisCtx.getMatchedFuncExpr(i);
+                boolean found = findMacthedExprFieldName(optFuncExpr, op, 
dataset, recType, datasetIndexes, context);
+                if (found && 
optFuncExpr.getFieldName(0).equals(filterFieldName)) {
+                    optFuncExprs.add(optFuncExpr);
+                }
             }
         }
+
         if (optFuncExprs.isEmpty()) {
-            return false;
+            assignFilterFromSecondaryUnnestMap(op, dataset, context, 
filterType);
+        } else {
+            assignFilterFromQuery(optFuncExprs, op, dataset, context, 
filterType);
         }
-        changePlan(optFuncExprs, op, dataset, context);
 
         OperatorPropertiesUtil.typeOpRec(opRef, context);
         context.addToDontApplySet(this, op);
@@ -147,9 +160,11 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
         return new AssignOperator(assignKeyVarList, assignKeyExprList);
     }
 
-    private void changePlan(List<IOptimizableFuncExpr> optFuncExprs, 
AbstractLogicalOperator op, Dataset dataset,
-            IOptimizationContext context) throws AlgebricksException {
+    private void assignFilterFromQuery(List<IOptimizableFuncExpr> 
optFuncExprs, AbstractLogicalOperator op,
+            Dataset dataset, IOptimizationContext context, IAType filterType) 
throws AlgebricksException {
 
+        List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>();
+        boolean hasSecondaryIndexMap = false;
         Queue<Mutable<ILogicalOperator>> queue = new 
LinkedList<>(op.getInputs());
         while (!queue.isEmpty()) {
             AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
queue.poll().getValue();
@@ -176,8 +191,7 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
 
                     
dataSourceScanOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
 
-                    assignOp.getInputs()
-                            .add(new 
MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue()));
+                    assignOp.getInputs().add(new 
MutableObject<>(dataSourceScanOp.getInputs().get(0).getValue()));
                     dataSourceScanOp.getInputs().get(0).setValue(assignOp);
                 }
             } else if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.UNNEST_MAP) {
@@ -207,14 +221,153 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
                                     .add(new 
MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
                         }
                         
unnestMapOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-                        assignOp.getInputs()
-                                .add(new 
MutableObject<>(unnestMapOp.getInputs().get(0).getValue()));
+                        assignOp.getInputs().add(new 
MutableObject<>(unnestMapOp.getInputs().get(0).getValue()));
                         unnestMapOp.getInputs().get(0).setValue(assignOp);
+
+                        if (jobGenParams.isPrimaryIndex) {
+                            primaryUnnestMapOps.add(unnestMapOp);
+                        } else {
+                            hasSecondaryIndexMap = true;
+                        }
                     }
                 }
             }
             queue.addAll(descendantOp.getInputs());
         }
+        if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) {
+            propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, 
context);
+        }
+    }
+
+    private void propagateFilterToPrimaryIndex(List<UnnestMapOperator> 
primaryUnnestMapOps, IAType filterType,
+            IOptimizationContext context) throws AlgebricksException {
+        for (UnnestMapOperator primaryOp : primaryUnnestMapOps) {
+            Mutable<ILogicalOperator> assignOrOrderOrIntersect = 
primaryOp.getInputs().get(0);
+            Mutable<ILogicalOperator> intersectOrSort = 
assignOrOrderOrIntersect;
+
+            if (assignOrOrderOrIntersect.getValue().getOperatorTag() == 
LogicalOperatorTag.ASSIGN) {
+                intersectOrSort = 
assignOrOrderOrIntersect.getValue().getInputs().get(0);
+            }
+
+            switch (intersectOrSort.getValue().getOperatorTag()) {
+                case INTERSECT:
+                    IntersectOperator intersect = (IntersectOperator) 
(intersectOrSort.getValue());
+                    List<List<LogicalVariable>> filterVars = new 
ArrayList<>(intersect.getInputs().size());
+                    for (Mutable<ILogicalOperator> mutableOp : 
intersect.getInputs()) {
+                        ILogicalOperator child = mutableOp.getValue();
+                        while 
(!child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP)) {
+                            child = child.getInputs().get(0).getValue();
+                        }
+                        UnnestMapOperator unnestMap = (UnnestMapOperator) 
child;
+                        propagateFilterInSecondaryUnnsetMap(unnestMap, 
filterType, context);
+
+                        List<LogicalVariable> extraVars = 
Arrays.asList(unnestMap.getPropagateIndexMinFilterVar(),
+                                unnestMap.getPropagateIndexMaxFilterVar());
+                        filterVars.add(extraVars);
+                    }
+                    if (!filterVars.isEmpty()) {
+                        List<LogicalVariable> outputFilterVars = new 
ArrayList<>(filterVars.get(0));
+                        IntersectOperator intersectWithFilter =
+                                createIntersectWithFilter(outputFilterVars, 
filterVars, intersect);
+
+                        intersectOrSort.setValue(intersectWithFilter);
+                        
context.computeAndSetTypeEnvironmentForOperator(intersectWithFilter);
+                        setPrimaryFilterVar(primaryOp, 
outputFilterVars.get(0), outputFilterVars.get(1), context);
+                    }
+                    break;
+                case ORDER:
+                    ILogicalOperator child = 
intersectOrSort.getValue().getInputs().get(0).getValue();
+                    if 
(child.getOperatorTag().equals(LogicalOperatorTag.UNNEST_MAP)) {
+                        UnnestMapOperator secondaryMap = (UnnestMapOperator) 
child;
+
+                        propagateFilterInSecondaryUnnsetMap(secondaryMap, 
filterType, context);
+
+                        setPrimaryFilterVar(primaryOp, 
secondaryMap.getPropagateIndexMinFilterVar(),
+                                secondaryMap.getPropagateIndexMaxFilterVar(), 
context);
+                    }
+                    break;
+                default:
+                    throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                            
intersectOrSort.getValue().getOperatorTag().toString());
+            }
+        }
+    }
+
+    private IntersectOperator createIntersectWithFilter(List<LogicalVariable> 
outputFilterVars,
+            List<List<LogicalVariable>> filterVars, IntersectOperator 
intersect) throws AlgebricksException {
+        List<LogicalVariable> outputVars = new ArrayList<>();
+        outputVars.addAll(intersect.getOutputVars());
+        outputVars.addAll(outputFilterVars);
+
+        List<List<LogicalVariable>> compareVars = new 
ArrayList<>(intersect.getNumInput());
+        for (int i = 0; i < intersect.getNumInput(); i++) {
+            compareVars.add(new ArrayList<>(intersect.getCompareVariables(i)));
+        }
+
+        IntersectOperator intersectWithFilter = new 
IntersectOperator(outputVars, compareVars, filterVars);
+        intersectWithFilter.getInputs().addAll(intersect.getInputs());
+        return intersectWithFilter;
+    }
+
+    private void propagateFilterInSecondaryUnnsetMap(UnnestMapOperator 
secondaryUnnest, IAType filterType,
+            IOptimizationContext context) throws AlgebricksException {
+
+        LogicalVariable minIndexFilterVar = context.newVar();
+        LogicalVariable maxIndexFilterVar = context.newVar();
+        secondaryUnnest.markPropagageIndexFilter();
+        secondaryUnnest.getVariables().add(minIndexFilterVar);
+        secondaryUnnest.getVariableTypes().add(filterType);
+        secondaryUnnest.getVariables().add(maxIndexFilterVar);
+        secondaryUnnest.getVariableTypes().add(filterType);
+
+        context.computeAndSetTypeEnvironmentForOperator(secondaryUnnest);
+    }
+
+    private void setPrimaryFilterVar(UnnestMapOperator primaryOp, 
LogicalVariable minFilterVar,
+            LogicalVariable maxFilterVar, IOptimizationContext context) throws 
AlgebricksException {
+        primaryOp.setMinFilterVars(Collections.singletonList(minFilterVar));
+        primaryOp.setMaxFilterVars(Collections.singletonList(maxFilterVar));
+
+        List<Mutable<ILogicalExpression>> indexFilterExpression =
+                Arrays.asList(new MutableObject<>(new 
VariableReferenceExpression(minFilterVar)),
+                        new MutableObject<>(new 
VariableReferenceExpression(maxFilterVar)));
+
+        primaryOp.setAdditionalFilteringExpressions(indexFilterExpression);
+        context.computeAndSetTypeEnvironmentForOperator(primaryOp);
+    }
+
+    private void assignFilterFromSecondaryUnnestMap(AbstractLogicalOperator 
op, Dataset dataset,
+            IOptimizationContext context, IAType filterType) throws 
AlgebricksException {
+        List<UnnestMapOperator> primaryUnnestMapOps = new ArrayList<>();
+        boolean hasSecondaryIndexMap = false;
+        Queue<Mutable<ILogicalOperator>> queue = new 
LinkedList<>(op.getInputs());
+        while (!queue.isEmpty()) {
+            ILogicalOperator descendantOp = queue.poll().getValue();
+            if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.UNNEST_MAP) {
+                UnnestMapOperator unnestMapOp = (UnnestMapOperator) 
descendantOp;
+                ILogicalExpression unnestExpr = 
unnestMapOp.getExpressionRef().getValue();
+                if (unnestExpr.getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
+                    AbstractFunctionCallExpression f = 
(AbstractFunctionCallExpression) unnestExpr;
+                    FunctionIdentifier fid = f.getFunctionIdentifier();
+                    if (!fid.equals(BuiltinFunctions.INDEX_SEARCH)) {
+                        throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, fid.getName());
+                    }
+                    AccessMethodJobGenParams jobGenParams = new 
AccessMethodJobGenParams();
+                    jobGenParams.readFromFuncArgs(f.getArguments());
+                    if 
(dataset.getDatasetName().compareTo(jobGenParams.datasetName) == 0) {
+                        if (jobGenParams.isPrimaryIndex) {
+                            primaryUnnestMapOps.add(unnestMapOp);
+                        } else {
+                            hasSecondaryIndexMap = true;
+                        }
+                    }
+                }
+            }
+            queue.addAll(descendantOp.getInputs());
+        }
+        if (hasSecondaryIndexMap && !primaryUnnestMapOps.isEmpty()) {
+            propagateFilterToPrimaryIndex(primaryUnnestMapOps, filterType, 
context);
+        }
     }
 
     private Dataset getDataset(AbstractLogicalOperator op, 
IOptimizationContext context) throws AlgebricksException {

Reply via email to