This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ca7927f  [ASTERIXDB-2771][*DB] Enabling LSM filters on datasets with 
meta records
ca7927f is described below

commit ca7927f7393ad3593279939bd501c2baaa499e39
Author: Xikui Wang <[email protected]>
AuthorDate: Thu Sep 3 11:27:57 2020 -0700

    [ASTERIXDB-2771][*DB] Enabling LSM filters on datasets with meta records
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This patch enables LSM filters on datasets with meta records. It
    introduces a filterSourceIndicator that indicates where the filter value
    comes from, (null - no filter, 0 - from the record, 1 - from the meta
    record). In LangExpressionToPlanTranslator, filter and meta handling
    are pushed into the translate(Insert/Upsert/Delete) methods separately.
    Currently, only UPSERTs are allowed on datasets with meta records, and
    only in this case, the filter value may come from the meta records.
    This patch also renamed an existing test case with where on change feed
    to avoid confuison. Legacy datasets without filterSourceIndicator will
    have 0 (pointing to record) by default.
    
    Change-Id: I6189169cafab9d99b8662ec91cbdd801cfae9dba
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7647
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../rules/am/IntroduceLSMComponentFilterRule.java  |  67 ++++---
 .../translator/LangExpressionToPlanTranslator.java | 220 +++++++++++----------
 .../asterix/translator/util/ValidateUtil.java      |  15 +-
 .../asterix/app/translator/QueryTranslator.java    |   8 +-
 .../asterix/app/bootstrap/TestNodeController.java  |   4 +-
 .../dataflow/CheckpointInSecondaryIndexTest.java   |   6 +-
 .../dataflow/GlobalVirtualBufferCacheTest.java     |   8 +-
 .../test/dataflow/MultiPartitionLSMIndexTest.java  |   6 +-
 .../dataflow/SearchCursorComponentSwitchTest.java  |   6 +-
 .../asterix/test/dataflow/StorageTestUtils.java    |   2 +-
 .../apache/asterix/test/dataflow/TestDataset.java  |   4 +-
 .../storage/IndexDropOperatorNodePushableTest.java |   6 +-
 .../queries/filter_on_meta_0.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_1.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_2.sqlpp}                |  23 ++-
 .../queries/filter_on_meta_3.sqlpp}                |  24 ++-
 .../queries/filter_on_meta_4.sqlpp}                |  24 ++-
 .../queries/filter_on_meta_5.sqlpp}                |  25 ++-
 .../optimizerts/results/filter_on_meta_0.plan      |  11 ++
 .../optimizerts/results/filter_on_meta_1.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_2.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_3.plan      |  11 ++
 .../optimizerts/results/filter_on_meta_4.plan      |  12 ++
 .../optimizerts/results/filter_on_meta_5.plan      |  12 ++
 .../change-feed-filter-on-meta-dataset.1.ddl.sqlpp |  51 +++++
 ...nge-feed-filter-on-meta-dataset.2.update.sqlpp} |   6 +-
 ...ange-feed-filter-on-meta-dataset.3.query.sqlpp} |   4 +-
 ...change-feed-filter-on-meta-dataset.4.ddl.sqlpp} |   3 +-
 .../change-feed-with-where-on-meta.1.ddl.sqlpp}    |   0
 ...change-feed-with-where-on-meta.10.update.sqlpp} |   0
 .../change-feed-with-where-on-meta.11.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.12.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.13.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.14.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.15.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.16.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.17.query.sqlpp} |   0
 .../change-feed-with-where-on-meta.18.ddl.sqlpp}   |   0
 .../change-feed-with-where-on-meta.2.update.sqlpp} |   0
 .../change-feed-with-where-on-meta.3.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.4.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.5.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.6.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.7.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.8.query.sqlpp}  |   0
 .../change-feed-with-where-on-meta.9.query.sqlpp}  |   0
 .../change-feed-filter-on-meta-dataset.1.adm       |   1 +
 .../change-feed-with-filter-on-meta.11.adm         |   0
 .../change-feed-with-filter-on-meta.12.adm         |   0
 .../change-feed-with-filter-on-meta.13.adm         |   0
 .../change-feed-with-filter-on-meta.14.adm         |   0
 .../change-feed-with-filter-on-meta.15.adm         |   0
 .../change-feed-with-filter-on-meta.16.adm         |   0
 .../change-feed-with-filter-on-meta.17.adm         |   0
 .../change-feed-with-filter-on-meta.3.adm          |   0
 .../change-feed-with-filter-on-meta.4.adm          |   0
 .../change-feed-with-filter-on-meta.5.adm          |   0
 .../change-feed-with-filter-on-meta.6.adm          |   0
 .../change-feed-with-filter-on-meta.7.adm          |   0
 .../change-feed-with-filter-on-meta.8.adm          |   0
 .../change-feed-with-filter-on-meta.9.adm          |   0
 .../test/resources/runtimets/testsuite_sqlpp.xml   |   9 +-
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj  |   5 +-
 .../lang/common/statement/InternalDetailsDecl.java |  15 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj    |   8 +-
 .../metadata/bootstrap/MetadataBootstrap.java      |   2 +-
 .../metadata/declared/MetadataProvider.java        |  13 +-
 .../apache/asterix/metadata/entities/Dataset.java  |   4 +-
 .../metadata/entities/InternalDatasetDetails.java  |  32 ++-
 .../DatasetTupleTranslator.java                    |  17 +-
 .../apache/asterix/metadata/utils/DatasetUtil.java |  29 ++-
 .../utils/SecondaryBTreeOperationsHelper.java      |   9 +-
 .../SecondaryCorrelatedBTreeOperationsHelper.java  |   9 +-
 .../utils/SecondaryIndexOperationsHelper.java      |   4 +-
 .../DatasetTupleTranslatorTest.java                |   2 +-
 .../IndexTupleTranslatorTest.java                  |   2 +-
 .../LSMPrimaryUpsertOperatorDescriptor.java        |  14 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java      |  27 ++-
 80 files changed, 611 insertions(+), 221 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index 95b7e17..c49670e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -89,21 +89,26 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
 
         Dataset dataset = getDataset(op, context);
+        Integer filterSourceIndicator = null;
         List<String> filterFieldName = null;
-        ARecordType recType = null;
+        ARecordType itemType = null;
         MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
         if (dataset != null && dataset.getDatasetType() == 
DatasetType.INTERNAL) {
+            filterSourceIndicator = 
DatasetUtil.getFilterSourceIndicator(dataset);
             filterFieldName = DatasetUtil.getFilterField(dataset);
-            IAType itemType = mp.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
-            if (itemType.getTypeTag() == ATypeTag.OBJECT) {
-                recType = (ARecordType) itemType;
+            IAType filterSourceType = filterSourceIndicator == null || 
filterSourceIndicator == 0
+                    ? mp.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName())
+                    : mp.findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
+
+            if (filterSourceType.getTypeTag() == ATypeTag.OBJECT) {
+                itemType = (ARecordType) filterSourceType;
             }
         }
-        if (filterFieldName == null || recType == null) {
+        if (filterFieldName == null || itemType == null) {
             return false;
         }
 
-        IAType filterType = recType.getSubFieldType(filterFieldName);
+        IAType filterType = itemType.getSubFieldType(filterFieldName);
 
         typeEnvironment = context.getOutputTypeEnvironment(op);
         ILogicalExpression condExpr = ((SelectOperator) 
op).getCondition().getValue();
@@ -116,10 +121,11 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
 
             for (int i = 0; i < analysisCtx.getMatchedFuncExprs().size(); i++) 
{
                 IOptimizableFuncExpr optFuncExpr = 
analysisCtx.getMatchedFuncExpr(i);
-                boolean found = findMacthedExprFieldName(optFuncExpr, op, 
dataset, recType, datasetIndexes, context);
+                boolean found = findMacthedExprFieldName(optFuncExpr, op, 
dataset, itemType, datasetIndexes, context,
+                        filterSourceIndicator);
                 // the field name source should be from the dataset record, 
i.e. source should be == 0
                 if (found && 
optFuncExpr.getFieldName(0).equals(filterFieldName)
-                        && optFuncExpr.getFieldSource(0) == 0) {
+                        && optFuncExpr.getFieldSource(0) == 
filterSourceIndicator) {
                     optFuncExprs.add(optFuncExpr);
                 }
             }
@@ -490,8 +496,8 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
     }
 
     private boolean findMacthedExprFieldName(IOptimizableFuncExpr optFuncExpr, 
AbstractLogicalOperator op,
-            Dataset dataset, ARecordType recType, List<Index> datasetIndexes, 
IOptimizationContext context)
-            throws AlgebricksException {
+            Dataset dataset, ARecordType filterSourceType, List<Index> 
datasetIndexes, IOptimizationContext context,
+            Integer filterSourceIndicator) throws AlgebricksException {
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
@@ -503,13 +509,16 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
                     if (funcVarIndex == -1) {
                         continue;
                     }
-                    // TODO(ali): this SQ NPE should be investigated
-                    List<String> fieldName =
-                            getFieldNameFromSubAssignTree(optFuncExpr, 
descendantOp, varIndex, recType).second;
-                    if (fieldName == null) {
+                    Pair<ARecordType, List<String>> fieldNamePairs =
+                            getFieldNameFromSubAssignTree(optFuncExpr, 
descendantOp, varIndex, filterSourceType,
+                                    filterSourceIndicator, 
dataset.getPrimaryKeys().size());
+                    if (fieldNamePairs == null) {
                         return false;
                     }
-                    optFuncExpr.setFieldName(funcVarIndex, fieldName, 0);
+                    List<String> fieldName = fieldNamePairs.second;
+                    // Since we validated the filter source in 
getFieldNameFromSubAssignTree, we can safely set the
+                    // fieldSource to be filterSourceIndicator
+                    optFuncExpr.setFieldName(funcVarIndex, fieldName, 
filterSourceIndicator);
                     return true;
                 }
             } else if (descendantOp.getOperatorTag() == 
LogicalOperatorTag.DATASOURCESCAN) {
@@ -564,7 +573,7 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
                     IAType metaItemType = ((MetadataProvider) 
context.getMetadataProvider())
                             .findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
                     ARecordType metaRecType = (ARecordType) metaItemType;
-                    int numSecondaryKeys = 
KeyFieldTypeUtil.getNumSecondaryKeys(index, recType, metaRecType);
+                    int numSecondaryKeys = 
KeyFieldTypeUtil.getNumSecondaryKeys(index, filterSourceType, metaRecType);
                     List<String> fieldName;
                     int keySource;
                     if (varIndex >= numSecondaryKeys) {
@@ -596,7 +605,8 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
     }
 
     private Pair<ARecordType, List<String>> 
getFieldNameFromSubAssignTree(IOptimizableFuncExpr optFuncExpr,
-            AbstractLogicalOperator op, int varIndex, ARecordType recType) {
+            AbstractLogicalOperator op, int varIndex, ARecordType 
filterSourceType, Integer filterSourceIndicator,
+            int numOfPKeys) {
         AbstractLogicalExpression expr = null;
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
@@ -619,8 +629,12 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
             for (int varCheck = 0; varCheck < op.getInputs().size(); 
varCheck++) {
                 AbstractLogicalOperator nestedOp = (AbstractLogicalOperator) 
op.getInputs().get(varCheck).getValue();
                 if (nestedOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
-                    if (varCheck == op.getInputs().size() - 1) {
-
+                    if (nestedOp.getOperatorTag() != 
LogicalOperatorTag.DATASOURCESCAN) {
+                        return null;
+                    }
+                    List<LogicalVariable> scannedVars = 
((DataSourceScanOperator) nestedOp).getScanVariables();
+                    if (scannedVars.indexOf(usedVar) != filterSourceIndicator 
+ numOfPKeys) {
+                        return null;
                     }
                 } else {
                     int nestedAssignVar = ((AssignOperator) 
nestedOp).getVariables().indexOf(usedVar);
@@ -630,9 +644,10 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
                     //get the nested info from the lower input
                     Pair<ARecordType, List<String>> lowerInfo = 
getFieldNameFromSubAssignTree(optFuncExpr,
                             (AbstractLogicalOperator) 
op.getInputs().get(varCheck).getValue(), nestedAssignVar,
-                            recType);
+                            filterSourceType, filterSourceIndicator, 
numOfPKeys);
                     if (lowerInfo != null) {
-                        recType = lowerInfo.first;
+                        // propagate filterSourceType in case the filter value 
comes from a nested attribute.
+                        filterSourceType = lowerInfo.first;
                         returnList = lowerInfo.second;
                     }
                 }
@@ -644,18 +659,18 @@ public class IntroduceLSMComponentFilterRule implements 
IAlgebraicRewriteRule {
                     return null;
                 }
                 returnList.add(fieldName);
-                return new Pair<>(recType, returnList);
+                return new Pair<>(filterSourceType, returnList);
             } else if (funcIdent == BuiltinFunctions.FIELD_ACCESS_BY_INDEX) {
                 Integer fieldIndex = 
ConstantExpressionUtil.getIntArgument(funcExpr, 1);
                 if (fieldIndex == null) {
                     return null;
                 }
-                returnList.add(recType.getFieldNames()[fieldIndex]);
-                IAType subType = recType.getFieldTypes()[fieldIndex];
+                returnList.add(filterSourceType.getFieldNames()[fieldIndex]);
+                IAType subType = filterSourceType.getFieldTypes()[fieldIndex];
                 if (subType.getTypeTag() == ATypeTag.OBJECT) {
-                    recType = (ARecordType) subType;
+                    filterSourceType = (ARecordType) subType;
                 }
-                return new Pair<>(recType, returnList);
+                return new Pair<>(filterSourceType, returnList);
             }
 
         }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 039b8db..fd4fd90 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -262,6 +262,7 @@ abstract class LangExpressionToPlanTranslator
             assign.setExplicitOrderingProperty(new 
LocalOrderProperty(orderColumns));
         }
 
+        // Load does not support meta record now.
         List<String> additionalFilteringField = 
DatasetUtil.getFilterField(targetDatasource.getDataset());
         List<LogicalVariable> additionalFilteringVars;
         List<Mutable<ILogicalExpression>> additionalFilteringAssignExpressions;
@@ -365,67 +366,52 @@ abstract class LangExpressionToPlanTranslator
             topOp.getInputs().get(0).setValue(assignCollectionToSequence);
             ProjectOperator projectOperator = (ProjectOperator) topOp;
             projectOperator.getVariables().set(0, seqVar);
-            resVar = seqVar;
+
             DatasetDataSource targetDatasource =
                     validateDatasetInfo(metadataProvider, 
stmt.getDataverseName(), stmt.getDatasetName(), sourceLoc);
             List<Integer> keySourceIndicator =
                     ((InternalDatasetDetails) 
targetDatasource.getDataset().getDatasetDetails())
                             .getKeySourceIndicator();
-            ArrayList<LogicalVariable> vars = new ArrayList<>();
-            ArrayList<Mutable<ILogicalExpression>> exprs = new ArrayList<>();
+            ArrayList<LogicalVariable> pkeyVars = new ArrayList<>();
+            ArrayList<Mutable<ILogicalExpression>> pkeyExprs = new 
ArrayList<>();
             List<Mutable<ILogicalExpression>> varRefsForLoading = new 
ArrayList<>();
             List<List<String>> partitionKeys = 
targetDatasource.getDataset().getPrimaryKeys();
             int numOfPrimaryKeys = partitionKeys.size();
             for (int i = 0; i < numOfPrimaryKeys; i++) {
                 if (keySourceIndicator == null || 
keySourceIndicator.get(i).intValue() == 0) {
                     // record part
-                    
PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), resVar, vars, 
exprs,
+                    
PlanTranslationUtil.prepareVarAndExpression(partitionKeys.get(i), seqVar, 
pkeyVars, pkeyExprs,
                             varRefsForLoading, context, sourceLoc);
                 } else {
                     // meta part
-                    
PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i), 
unnestVar, exprs, vars,
-                            varRefsForLoading, context, sourceLoc);
+                    
PlanTranslationUtil.prepareMetaKeyAccessExpression(partitionKeys.get(i), 
unnestVar, pkeyExprs,
+                            pkeyVars, varRefsForLoading, context, sourceLoc);
                 }
             }
 
-            AssignOperator assign = new AssignOperator(vars, exprs);
-            assign.setSourceLocation(sourceLoc);
-            List<String> additionalFilteringField = 
DatasetUtil.getFilterField(targetDatasource.getDataset());
-            List<LogicalVariable> additionalFilteringVars;
-            List<Mutable<ILogicalExpression>> 
additionalFilteringAssignExpressions;
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions = 
null;
-            AssignOperator additionalFilteringAssign = null;
-            if (additionalFilteringField != null) {
-                additionalFilteringVars = new ArrayList<>();
-                additionalFilteringAssignExpressions = new ArrayList<>();
-                additionalFilteringExpressions = new ArrayList<>();
-
-                
PlanTranslationUtil.prepareVarAndExpression(additionalFilteringField, resVar, 
additionalFilteringVars,
-                        additionalFilteringAssignExpressions, 
additionalFilteringExpressions, context, sourceLoc);
-
-                additionalFilteringAssign =
-                        new AssignOperator(additionalFilteringVars, 
additionalFilteringAssignExpressions);
-                additionalFilteringAssign.getInputs().add(new 
MutableObject<>(topOp));
-                additionalFilteringAssign.setSourceLocation(sourceLoc);
-                assign.getInputs().add(new 
MutableObject<>(additionalFilteringAssign));
-            } else {
-                assign.getInputs().add(new MutableObject<>(topOp));
-            }
+            AssignOperator pkeyAssignOp = new AssignOperator(pkeyVars, 
pkeyExprs);
+            pkeyAssignOp.setSourceLocation(sourceLoc);
+            pkeyAssignOp.getInputs().add(new MutableObject<>(topOp));
+
+            // the filters and metas could be handled here once we have 
unified processing for metas in
+            // all insert/upsert/delete
 
+            VariableReferenceExpression seqVarRef = new 
VariableReferenceExpression(seqVar);
+            seqVarRef.setSourceLocation(sourceLoc);
+            Mutable<ILogicalExpression> seqRef = new 
MutableObject<>(seqVarRef);
             ILogicalOperator leafOperator;
             switch (stmt.getKind()) {
                 case INSERT:
-                    leafOperator = translateInsert(targetDatasource, resVar, 
varRefsForLoading,
-                            additionalFilteringExpressions, assign, stmt, 
resultMetadata);
+                    leafOperator = translateInsert(targetDatasource, seqRef, 
varRefsForLoading, seqVar, pkeyAssignOp,
+                            stmt, resultMetadata);
                     break;
                 case UPSERT:
-                    leafOperator = translateUpsert(targetDatasource, resVar, 
varRefsForLoading,
-                            additionalFilteringExpressions, assign, 
additionalFilteringField, unnestVar, topOp, exprs,
-                            additionalFilteringAssign, stmt, resultMetadata);
+                    leafOperator = translateUpsert(targetDatasource, seqRef, 
varRefsForLoading, pkeyAssignOp, unnestVar,
+                            topOp, pkeyExprs, seqVar, stmt, resultMetadata);
                     break;
                 case DELETE:
-                    leafOperator = translateDelete(targetDatasource, resVar, 
varRefsForLoading,
-                            additionalFilteringExpressions, assign, stmt);
+                    leafOperator =
+                            translateDelete(targetDatasource, seqRef, 
varRefsForLoading, seqVar, pkeyAssignOp, stmt);
                     break;
                 default:
                     throw new 
CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -439,9 +425,8 @@ abstract class LangExpressionToPlanTranslator
         return plan;
     }
 
-    protected ILogicalOperator translateDelete(DatasetDataSource 
targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
+    private ILogicalOperator translateDelete(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, 
LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
             ICompiledDmlStatement stmt) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (targetDatasource.getDataset().hasMetaPart()) {
@@ -449,12 +434,19 @@ abstract class LangExpressionToPlanTranslator
                     targetDatasource.getDataset().getDatasetName()
                             + ": delete from dataset is not supported on 
Datasets with Meta records");
         }
-        VariableReferenceExpression varRef = new 
VariableReferenceExpression(resVar);
-        varRef.setSourceLocation(stmt.getSourceLocation());
-        InsertDeleteUpsertOperator deleteOp = new 
InsertDeleteUpsertOperator(targetDatasource,
-                new MutableObject<>(varRef), varRefsForLoading, 
InsertDeleteUpsertOperator.Kind.DELETE, false);
-        
deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        deleteOp.getInputs().add(new MutableObject<>(assign));
+
+        List<String> filterField = 
DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+
+        // currently, meta-datasets cannot be inserted.
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, 
seqVar, sourceLoc);
+        }
+
+        InsertDeleteUpsertOperator deleteOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, 
false);
+        deleteOp.setAdditionalFilteringExpressions(filterExprs);
+        deleteOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
         deleteOp.setSourceLocation(sourceLoc);
         DelegateOperator leafOperator = new DelegateOperator(new 
CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
@@ -462,12 +454,11 @@ abstract class LangExpressionToPlanTranslator
         return leafOperator;
     }
 
-    protected ILogicalOperator translateUpsert(DatasetDataSource 
targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
-            List<String> additionalFilteringField, LogicalVariable unnestVar, 
ILogicalOperator topOp,
-            List<Mutable<ILogicalExpression>> exprs, AssignOperator 
additionalFilteringAssign,
-            ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws 
AlgebricksException {
+    private ILogicalOperator translateUpsert(DatasetDataSource 
targetDatasource,
+            Mutable<ILogicalExpression> payloadVarRef, 
List<Mutable<ILogicalExpression>> varRefsForLoading,
+            ILogicalOperator pkeyAssignOp, LogicalVariable unnestVar, 
ILogicalOperator topOp,
+            List<Mutable<ILogicalExpression>> pkeyExprs, LogicalVariable 
seqVar, ICompiledDmlStatement stmt,
+            IResultMetadata resultMetadata) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (!targetDatasource.getDataset().allow(topOp, 
DatasetUtil.OP_UPSERT)) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, 
sourceLoc,
@@ -479,15 +470,15 @@ abstract class LangExpressionToPlanTranslator
         Expression returnExpression = compiledUpsert.getReturnExpression();
         InsertDeleteUpsertOperator upsertOp;
         ILogicalOperator rootOperator;
+        List<String> filterField = 
DatasetUtil.getFilterField(targetDatasource.getDataset());
+
         if (targetDatasource.getDataset().hasMetaPart()) {
             if (returnExpression != null) {
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR, 
sourceLoc,
                         "Returning not allowed on datasets with Meta records");
             }
-            AssignOperator metaAndKeysAssign;
             List<LogicalVariable> metaAndKeysVars;
             List<Mutable<ILogicalExpression>> metaAndKeysExprs;
-            List<Mutable<ILogicalExpression>> metaExpSingletonList;
             metaAndKeysVars = new ArrayList<>();
             metaAndKeysExprs = new ArrayList<>();
             // add the meta function
@@ -499,18 +490,16 @@ abstract class LangExpressionToPlanTranslator
             metaFunction.setSourceLocation(sourceLoc);
             // create assign for the meta part
             LogicalVariable metaVar = context.newVar();
-            metaExpSingletonList = new ArrayList<>(1);
             VariableReferenceExpression metaVarRef = new 
VariableReferenceExpression(metaVar);
             metaVarRef.setSourceLocation(sourceLoc);
-            metaExpSingletonList.add(new MutableObject<>(metaVarRef));
             metaAndKeysVars.add(metaVar);
             metaAndKeysExprs.add(new MutableObject<>(metaFunction));
             project.getVariables().add(metaVar);
             varRefsForLoading.clear();
-            for (Mutable<ILogicalExpression> assignExpr : exprs) {
+            for (Mutable<ILogicalExpression> assignExpr : pkeyExprs) {
                 if (assignExpr.getValue().getExpressionTag() == 
LogicalExpressionTag.FUNCTION_CALL) {
                     AbstractFunctionCallExpression funcCall = 
(AbstractFunctionCallExpression) assignExpr.getValue();
-                    funcCall.substituteVar(resVar, unnestVar);
+                    funcCall.substituteVar(seqVar, unnestVar);
                     LogicalVariable pkVar = context.newVar();
                     metaAndKeysVars.add(pkVar);
                     metaAndKeysExprs.add(new 
MutableObject<>(assignExpr.getValue()));
@@ -519,58 +508,68 @@ abstract class LangExpressionToPlanTranslator
                 }
             }
             // A change feed, we don't need the assign to access PKs
-            VariableReferenceExpression varRef = new 
VariableReferenceExpression(resVar);
-            varRef.setSourceLocation(stmt.getSourceLocation());
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new 
MutableObject<>(varRef), varRefsForLoading,
-                    metaExpSingletonList, 
InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, 
payloadVarRef, varRefsForLoading,
+                    Collections.singletonList(new 
MutableObject<>(metaVarRef)), InsertDeleteUpsertOperator.Kind.UPSERT,
+                    false);
             upsertOp.setUpsertIndicatorVar(context.newVar());
             upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the 
original record
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(targetDatasource.getItemType());
             upsertOp.setSourceLocation(sourceLoc);
-            if (targetDatasource.getDataset().hasMetaPart()) {
-                List<LogicalVariable> metaVars = new ArrayList<>();
-                metaVars.add(context.newVar());
-                upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
-                List<Object> metaTypes = new ArrayList<>();
-                metaTypes.add(targetDatasource.getMetaItemType());
-                upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
-            }
 
-            if (additionalFilteringField != null) {
-                upsertOp.setPrevFilterVar(context.newVar());
-                upsertOp.setPrevFilterType(
-                        ((ARecordType) 
targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
-                additionalFilteringAssign.getInputs().clear();
-                
additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                upsertOp.getInputs().add(new 
MutableObject<>(additionalFilteringAssign));
-            } else {
-                upsertOp.getInputs().add(assign.getInputs().get(0));
-            }
-            metaAndKeysAssign = new AssignOperator(metaAndKeysVars, 
metaAndKeysExprs);
+            List<LogicalVariable> metaVars = new ArrayList<>();
+            metaVars.add(context.newVar());
+            upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
+            List<Object> metaTypes = new ArrayList<>();
+            metaTypes.add(targetDatasource.getMetaItemType());
+            upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+
+            // insert meta key assign before project
+            AssignOperator metaAndKeysAssign = new 
AssignOperator(metaAndKeysVars, metaAndKeysExprs);
             metaAndKeysAssign.getInputs().add(topOp.getInputs().get(0));
             metaAndKeysAssign.setSourceLocation(sourceLoc);
             topOp.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-            
upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+
+            // insert filter assign
+            if (filterField != null) {
+                LogicalVariable filterSourceVar =
+                        
DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset()) == 0 ? 
seqVar : metaVar;
+                ARecordType filterSourceType = 
DatasetUtil.getFilterSourceIndicator(targetDatasource.getDataset()) == 0
+                        ? (ARecordType) targetDatasource.getItemType()
+                        : (ARecordType) targetDatasource.getMetaItemType();
+
+                List<Mutable<ILogicalExpression>> filterExprs =
+                        generatedFilterExprs(pkeyAssignOp, filterField, 
filterSourceVar, sourceLoc);
+
+                upsertOp.setPrevFilterVar(context.newVar());
+                
upsertOp.setPrevFilterType(filterSourceType.getFieldType(filterField.get(0)));
+                upsertOp.setAdditionalFilteringExpressions(filterExprs);
+                upsertOp.getInputs().add(pkeyAssignOp.getInputs().get(0));
+            } else {
+                upsertOp.getInputs().add(new MutableObject<>(topOp));
+                upsertOp.setAdditionalFilteringExpressions(null);
+            }
         } else {
-            VariableReferenceExpression varRef = new 
VariableReferenceExpression(resVar);
-            varRef.setSourceLocation(stmt.getSourceLocation());
-            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, new 
MutableObject<>(varRef), varRefsForLoading,
+            ARecordType recordType = (ARecordType) 
targetDatasource.getItemType();
+            List<Mutable<ILogicalExpression>> filterExprs = null;
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, 
payloadVarRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            
upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            upsertOp.getInputs().add(new MutableObject<>(assign));
+
+            if (filterField != null) {
+                // add filter assign
+                filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, 
seqVar, sourceLoc);
+                upsertOp.setPrevFilterVar(context.newVar());
+                
upsertOp.setPrevFilterType(recordType.getFieldType(filterField.get(0)));
+            }
+            upsertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
+            upsertOp.setAdditionalFilteringExpressions(filterExprs);
             upsertOp.setSourceLocation(sourceLoc);
             upsertOp.setUpsertIndicatorVar(context.newVar());
             upsertOp.setUpsertIndicatorVarType(BuiltinType.ABOOLEAN);
             // Create and add a new variable used for representing the 
original record
-            ARecordType recordType = (ARecordType) 
targetDatasource.getItemType();
             upsertOp.setPrevRecordVar(context.newVar());
             upsertOp.setPrevRecordType(recordType);
-            if (additionalFilteringField != null) {
-                upsertOp.setPrevFilterVar(context.newVar());
-                
upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
-            }
         }
         DelegateOperator delegateOperator = new DelegateOperator(new 
CommitOperator(returnExpression == null));
         delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
@@ -581,9 +580,8 @@ abstract class LangExpressionToPlanTranslator
         return processReturningExpression(rootOperator, upsertOp, 
compiledUpsert, resultMetadata);
     }
 
-    protected ILogicalOperator translateInsert(DatasetDataSource 
targetDatasource, LogicalVariable resVar,
-            List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, 
ILogicalOperator assign,
+    private ILogicalOperator translateInsert(DatasetDataSource 
targetDatasource, Mutable<ILogicalExpression> varRef,
+            List<Mutable<ILogicalExpression>> varRefsForLoading, 
LogicalVariable seqVar, ILogicalOperator pkeyAssignOp,
             ICompiledDmlStatement stmt, IResultMetadata resultMetadata) throws 
AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
         if (targetDatasource.getDataset().hasMetaPart()) {
@@ -591,13 +589,20 @@ abstract class LangExpressionToPlanTranslator
                     targetDatasource.getDataset().getDatasetName()
                             + ": insert into dataset is not supported on 
Datasets with Meta records");
         }
+
+        List<String> filterField = 
DatasetUtil.getFilterField(targetDatasource.getDataset());
+        List<Mutable<ILogicalExpression>> filterExprs = null;
+
+        // currently, meta-datasets cannot be inserted.
+        if (filterField != null) {
+            filterExprs = generatedFilterExprs(pkeyAssignOp, filterField, 
seqVar, sourceLoc);
+        }
+
         // Adds the insert operator.
-        VariableReferenceExpression varRef = new 
VariableReferenceExpression(resVar);
-        varRef.setSourceLocation(stmt.getSourceLocation());
-        InsertDeleteUpsertOperator insertOp = new 
InsertDeleteUpsertOperator(targetDatasource,
-                new MutableObject<>(varRef), varRefsForLoading, 
InsertDeleteUpsertOperator.Kind.INSERT, false);
-        
insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.getInputs().add(new MutableObject<>(assign));
+        InsertDeleteUpsertOperator insertOp = new 
InsertDeleteUpsertOperator(targetDatasource, varRef,
+                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, 
false);
+        insertOp.setAdditionalFilteringExpressions(filterExprs);
+        insertOp.getInputs().add(new MutableObject<>(pkeyAssignOp));
         insertOp.setSourceLocation(sourceLoc);
 
         // Adds the commit operator.
@@ -611,6 +616,21 @@ abstract class LangExpressionToPlanTranslator
         return processReturningExpression(rootOperator, insertOp, 
compiledInsert, resultMetadata);
     }
 
+    private List<Mutable<ILogicalExpression>> 
generatedFilterExprs(ILogicalOperator pkeyAssignOp,
+            List<String> filterField, LogicalVariable seqVar, SourceLocation 
sourceLoc) {
+        List<LogicalVariable> filterVars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> filterAssignExprs = new 
ArrayList<>();
+        List<Mutable<ILogicalExpression>> filterExprs = new ArrayList<>();
+
+        PlanTranslationUtil.prepareVarAndExpression(filterField, seqVar, 
filterVars, filterAssignExprs, filterExprs,
+                context, sourceLoc);
+        AssignOperator additionalFilteringAssign = new 
AssignOperator(filterVars, filterAssignExprs);
+        
additionalFilteringAssign.getInputs().add(pkeyAssignOp.getInputs().get(0));
+        additionalFilteringAssign.setSourceLocation(sourceLoc);
+        pkeyAssignOp.getInputs().set(0, new 
MutableObject<>(additionalFilteringAssign));
+        return filterExprs;
+    }
+
     // Stitches the translated operators for the returning expression into the 
query
     // plan.
     protected ILogicalOperator processReturningExpression(ILogicalOperator 
inputOperator,
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
index e587e70..2869313 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
@@ -45,10 +45,14 @@ public class ValidateUtil {
     /**
      * Validates the field that will be used as filter for the components of 
an LSM index.
      *
-     * @param dataset
-     *            the dataset
      * @param recordType
      *            the record type
+     * @param metaType
+     *            the meta record type
+     * @param filterSourceIndicator
+     *            indicates where the filter attribute comes from, 0 for 
record, 1 for meta record.
+     *            since this method is called only when a filter field 
presents, filterSourceIndicator will not be null
+     *
      * @param filterField
      *            the full name of the field
      * @param sourceLoc
@@ -57,9 +61,10 @@ public class ValidateUtil {
      *             if field type can't be a filter type.
      *             if field type is nullable.
      */
-    public static void validateFilterField(ARecordType recordType, 
List<String> filterField, SourceLocation sourceLoc)
-            throws AlgebricksException {
-        IAType fieldType = recordType.getSubFieldType(filterField);
+    public static void validateFilterField(ARecordType recordType, ARecordType 
metaType, Integer filterSourceIndicator,
+            List<String> filterField, SourceLocation sourceLoc) throws 
AlgebricksException {
+        ARecordType itemType = filterSourceIndicator == 0 ? recordType : 
metaType;
+        IAType fieldType = itemType.getSubFieldType(filterField);
         if (fieldType == null) {
             throw new 
CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND, sourceLoc,
                     RecordUtil.toFullyQualifiedName(filterField));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index dd1f193..9a8895b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -769,8 +769,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                             metaRecType, partitioningExprs, 
keySourceIndicators, autogenerated, sourceLoc);
 
                     List<String> filterField = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getFilterField();
+                    Integer filterSourceIndicator =
+                            ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getFilterSourceIndicator();
+
                     if (filterField != null) {
-                        ValidateUtil.validateFilterField(aRecordType, 
filterField, sourceLoc);
+                        ValidateUtil.validateFilterField(aRecordType, 
metaRecType, filterSourceIndicator, filterField,
+                                sourceLoc);
                     }
                     if (compactionPolicy == null && filterField != null) {
                         // If the dataset has a filter and the user didn't 
specify a merge
@@ -781,7 +785,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                     }
                     datasetDetails = new 
InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                             InternalDatasetDetails.PartitioningStrategy.HASH, 
partitioningExprs, partitioningExprs,
-                            keySourceIndicators, partitioningTypes, 
autogenerated, filterField);
+                            keySourceIndicators, partitioningTypes, 
autogenerated, filterSourceIndicator, filterField);
                     break;
                 case EXTERNAL:
                     ExternalDetailsDecl externalDetails = 
(ExternalDetailsDecl) dd.getDatasetDetailsDecl();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index ebf98df..1040781 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -800,8 +800,8 @@ public class TestNodeController {
                 new LSMPrimaryUpsertOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
                         indexHelperFactory, 
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                         recordDescProvider.getInputRecordDescriptor(new 
ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        modificationCallbackFactory, searchCallbackFactory,
-                        keyIndexes.length, recordType, -1, 
frameOpCallbackFactory == null
+                        modificationCallbackFactory, searchCallbackFactory, 
keyIndexes.length,
+                        0, recordType, -1, frameOpCallbackFactory == null
                                 ? 
dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory,
                         MissingWriterFactory.INSTANCE, hasSecondaries);
         RecordDescriptor upsertOutRecDesc = 
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
index b43c445..d60702f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -165,9 +165,9 @@ public class CheckpointInSecondaryIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, 
null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, 
INDEX_TYPE, INDEX_FIELD_NAMES,
                 INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, 
false, 0);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 0e51f28..2034aa8 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -189,16 +189,16 @@ public class GlobalVirtualBufferCacheTest {
 
     private void createIndex() throws Exception {
         dataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "ds", 
StorageTestUtils.DATAVERSE_NAME,
-                StorageTestUtils.DATA_TYPE_NAME, 
StorageTestUtils.NODE_GROUP_NAME,
-                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                        StorageTestUtils.PARTITIONING_KEYS, null, null, null, 
false, null),
+                StorageTestUtils.DATA_TYPE_NAME, 
StorageTestUtils.NODE_GROUP_NAME, NoMergePolicyFactory.NAME,
+                null, new InternalDatasetDetails(null, 
PartitioningStrategy.HASH, StorageTestUtils.PARTITIONING_KEYS,
+                        null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID, 0);
 
         filteredDataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, 
"filtered_ds",
                 StorageTestUtils.DATAVERSE_NAME, 
StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
                 NoMergePolicyFactory.NAME, null,
                 new InternalDatasetDetails(null, PartitioningStrategy.HASH, 
StorageTestUtils.PARTITIONING_KEYS, null,
-                        null, null, false, Collections.singletonList("value")),
+                        null, null, false, 0, 
Collections.singletonList("value")),
                 null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID + 1, 
0);
 
         primaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 67c291f..41b889d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -155,9 +155,9 @@ public class MultiPartitionLSMIndexTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, 
null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, 
INDEX_TYPE, INDEX_FIELD_NAMES,
                 INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, 
false, 0);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 28da85c..a9e86cb 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -124,9 +124,9 @@ public class SearchCursorComponentSwitchTest {
     public void createIndex() throws Exception {
         List<List<String>> partitioningKeys = new ArrayList<>();
         partitioningKeys.add(Collections.singletonList("key"));
-        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME,
-                NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null,
-                        PartitioningStrategy.HASH, partitioningKeys, null, 
null, null, false, null),
+        dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+                NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                        partitioningKeys, null, null, null, false, null, null),
                 null, DatasetType.INTERNAL, DATASET_ID, 0);
         PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, 
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
                 storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index e41c193..3ef1e62 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -95,7 +95,7 @@ public class StorageTestUtils {
     public static final TestDataset DATASET =
             new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, 
DATA_TYPE_NAME, NODE_GROUP_NAME,
                     NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
-                            PARTITIONING_KEYS, null, null, null, false, null),
+                            PARTITIONING_KEYS, null, null, null, false, null, 
null),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
 
     private StorageTestUtils() {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index a6a53cb..f28e9bb 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -71,9 +71,9 @@ public class TestDataset extends Dataset {
     public IResourceFactory getResourceFactory(MetadataProvider mdProvider, 
Index index, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, 
Map<String, String> mergePolicyProperties)
             throws AlgebricksException {
-        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(this, recordType);
+        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(this, recordType, metaType);
         IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtil.computeFilterBinaryComparatorFactories(this,
-                recordType, 
mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+                recordType, metaType, 
mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
         IResourceFactory resourceFactory =
                 
TestLsmBTreeResourceFactoryProvider.INSTANCE.getResourceFactory(mdProvider, 
this, index, recordType,
                         metaType, mergePolicyFactory, mergePolicyProperties, 
filterTypeTraits, filterCmpFactories);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index f7f7207..d08a960 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -105,9 +105,9 @@ public class IndexDropOperatorNodePushableTest {
             List<List<String>> partitioningKeys = new ArrayList<>();
             partitioningKeys.add(Collections.singletonList("key"));
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, 
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
-                    NoMergePolicyFactory.NAME,
-                    null, new InternalDatasetDetails(null, 
InternalDatasetDetails.PartitioningStrategy.HASH,
-                            partitioningKeys, null, null, null, false, null),
+                    NoMergePolicyFactory.NAME, null,
+                    new InternalDatasetDetails(null, 
InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys,
+                            null, null, null, false, null, null),
                     null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0);
             // create dataset
             TestNodeController.PrimaryIndexInfo indexInfo = 
nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
similarity index 63%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
index 6a6c12e..33d826b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_0.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on record
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on attr;
+
+select * from KVStore WHERE attr > 10;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
similarity index 62%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
index 6a6c12e..3a5743c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_1.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on meta().seq;
+
+select * from KVStore WHERE meta().seq > 11;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
similarity index 61%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
index 6a6c12e..415e1a9 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_2.sqlpp
@@ -16,7 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta key and query on meta key
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on meta().`key`;
+
+select k, meta() as meta from KVStore k WHERE meta().`key` > 12;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
similarity index 62%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
index 6a6c12e..4cca505 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_3.sqlpp
@@ -16,7 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on record
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on attr;
+
+select * from KVStore WHERE attr > 10;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
similarity index 61%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
index 6a6c12e..cfd4363 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_4.sqlpp
@@ -16,7 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on meta().attr;
+
+select * from KVStore WHERE meta().attr > 10;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
similarity index 59%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
index 6a6c12e..0383e88 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/filter_on_meta_5.sqlpp
@@ -16,7 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+// filter on meta and query on meta with a composite primary key
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
 
-use test;
+create type DocumentType as open{
+  attr1: int64,
+  attr2: int64
+};
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
CLASS2_DS2 c)} from CLASS2_DS2 v order by v.id;
\ No newline at end of file
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  seq:int64,
+  lockTime:int32,
+  attr: int64
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key attr1, 
attr2 with filter on meta().attr;
+
+select * from KVStore WHERE meta().attr > 10;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan
new file mode 100644
index 0000000..fbdf7a6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_0.plan
@@ -0,0 +1,11 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |PARTITIONED|
+          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_1.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_2.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan
new file mode 100644
index 0000000..fbdf7a6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_3.plan
@@ -0,0 +1,11 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |PARTITIONED|
+          -- STREAM_PROJECT  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_4.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan
new file mode 100644
index 0000000..69abc9a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/filter_on_meta_5.plan
@@ -0,0 +1,12 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- STREAM_SELECT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..fbcc7db
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.ddl.sqlpp
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"; you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse KeyVerse if exists;
+create dataverse KeyVerse;
+use KeyVerse;
+
+create type DocumentType as open{
+};
+
+create type KVMetaType as open{
+  `key`:string,
+  vbucket:int32,
+  seq:int64,
+  cas:int64,
+  expiration:int32,
+  flags:int32,
+  revSeq:int64,
+  lockTime:int32
+};
+
+create dataset KVStore(DocumentType) with meta(KVMetaType) primary key 
meta().`key` with filter on meta().seq;
+
+create feed KVChangeStream with {
+ "adapter-name" : "adapter",
+  "type-name" : "DocumentType",
+  "meta-type-name" : "KVMetaType",
+  "reader" : 
"org.apache.asterix.external.input.record.reader.kv.KVTestReaderFactory",
+  "parser" : "record-with-metadata",
+  "format" : "dcp",
+  "record-format" : "json",
+  "change-feed" : "true",
+  "key-indexes" : "0",
+  "key-indicators" : "1",
+  "num-of-records" : "1000"
+};
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
similarity index 86%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
index 459bb88..76ad55b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.2.update.sqlpp
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use KeyVerse;
 
-use test;
+set `wait-for-completion-feed` "true";
+connect feed KVChangeStream to dataset KVStore;
 
-select value {"rec": v, "meta": meta(), "count": (select value count(*) from 
UK_DS1 c)} from UK_DS1 v order by v.id;
\ No newline at end of file
+start feed KVChangeStream;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
similarity index 92%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
index f12a2b7..61c1d45 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.3.query.sqlpp
@@ -16,5 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use KeyVerse;
 
-drop dataverse test;
\ No newline at end of file
+select k, meta() as meta from KVStore k
+where k.id = 5;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
similarity index 97%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
index f12a2b7..46a5fad 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.4.ddl.sqlpp
@@ -16,5 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-drop dataverse test;
\ No newline at end of file
+drop dataverse KeyVerse;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.1.ddl.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.1.ddl.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.1.ddl.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.10.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.10.update.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.10.update.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.10.update.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.11.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.11.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.12.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.12.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.13.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.13.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.14.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.14.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.15.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.15.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.16.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.16.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.17.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.17.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.18.ddl.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.18.ddl.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.18.ddl.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.2.update.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.2.update.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.2.update.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.3.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.3.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.4.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.4.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.5.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.5.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.6.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.6.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.7.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.7.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.8.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.8.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.9.query.sqlpp
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.query.sqlpp
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/change-feed-with-where-on-meta/change-feed-with-where-on-meta.9.query.sqlpp
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm
new file mode 100644
index 0000000..fa522e6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-filter-on-meta-dataset/change-feed-filter-on-meta-dataset.1.adm
@@ -0,0 +1 @@
+{ "k": { "id": 5, "name": "Ian Maxon", "exp": 15 }, "meta": { "key": "8-2", 
"vbucket": 8, "seq": 21, "cas": 5, "expiration": 8004, "flags": 0, "revSeq": 0, 
"lockTime": 163 } }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.11.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.11.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.11.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.12.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.12.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.12.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.13.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.13.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.13.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.14.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.14.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.14.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.15.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.15.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.15.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.16.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.16.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.16.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.17.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.17.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.17.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.3.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.3.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.3.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.4.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.4.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.4.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.5.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.5.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.5.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.6.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.6.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.6.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.7.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.7.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.7.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.8.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.8.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.8.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.9.adm
similarity index 100%
rename from 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-filter-on-meta/change-feed-with-filter-on-meta.9.adm
rename to 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-where-on-meta/change-feed-with-filter-on-meta.9.adm
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9e83b57..91abbea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12342,6 +12342,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
+      <compilation-unit name="change-feed-filter-on-meta-dataset">
+        <output-dir 
compare="Text">change-feed-filter-on-meta-dataset</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="feeds">
       <compilation-unit name="change-feed-with-meta-pk-index">
         <output-dir compare="Text">change-feed-with-meta-pk-index</output-dir>
       </compilation-unit>
@@ -12533,8 +12538,8 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
-      <compilation-unit name="change-feed-with-filter-on-meta">
-        <output-dir compare="Text">change-feed-with-filter-on-meta</output-dir>
+      <compilation-unit name="change-feed-with-where-on-meta">
+        <output-dir compare="Text">change-feed-with-where-on-meta</output-dir>
       </compilation-unit>
     </test-case>
   </test-group>
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 00c7210..5208b5d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -218,6 +218,7 @@ public class ErrorCode {
     public static final int UNKNOWN_ADAPTER = 1125;
     public static final int INVALID_EXTERNAL_IDENTIFIER_SIZE = 1126;
     public static final int UNSUPPORTED_ADAPTER_LANGUAGE = 1127;
+    public static final int INCONSISTENT_FILTER_INDICATOR = 1128;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 8e761d9..07a496c 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -213,6 +213,7 @@
 1125 = Cannot find adapter with name %1$s
 1126 = Invalid number of elements in external identifier. Expected %1$s 
elements for %2$s language
 1127 = Unsupported adapter language: %1$s
+1128 = Filter field is not defined properly
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj 
b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index 00d183f..e903977 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -561,11 +561,12 @@ DatasetDecl DatasetSpecification() throws ParseException:
         if(filterField!=null && filterField.first!=0){
           throw new ParseException("A filter field can only be a field in the 
main record of the dataset.");
         }
-        InternalDetailsDecl idd = new 
InternalDetailsDecl(primaryKeyFields.second,
+        try{
+          InternalDetailsDecl idd = new 
InternalDetailsDecl(primaryKeyFields.second,
                                                           
primaryKeyFields.first,
                                                           autogenerated,
+                                                          filterField == null? 
null : filterField.first,
                                                           filterField == null? 
null : filterField.second);
-        try{
           dsetDecl = new DatasetDecl(nameComponents.first,
                                    nameComponents.second,
                                    new TypeReferenceExpression(typeComponents),
diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
index f312ddd..3ddd261 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InternalDetailsDecl.java
@@ -20,18 +20,28 @@ package org.apache.asterix.lang.common.statement;
 
 import java.util.List;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+
 public class InternalDetailsDecl implements IDatasetDetailsDecl {
     private final List<List<String>> partitioningExprs;
     private final List<Integer> keySourceIndicators;
+    private final Integer filterSourceIndicator;
     private final boolean autogenerated;
     private final List<String> filterField;
 
     public InternalDetailsDecl(List<List<String>> partitioningExpr, 
List<Integer> keySourceIndicators,
-            boolean autogenerated, List<String> filterField) {
+            boolean autogenerated, Integer filterSourceIndicator, List<String> 
filterField)
+            throws CompilationException {
         this.partitioningExprs = partitioningExpr;
         this.keySourceIndicators = keySourceIndicators;
         this.autogenerated = autogenerated;
+        if (filterSourceIndicator == null && filterField != null
+                || filterSourceIndicator != null && filterField == null) {
+            throw new 
CompilationException(ErrorCode.INCONSISTENT_FILTER_INDICATOR);
+        }
         this.filterField = filterField;
+        this.filterSourceIndicator = filterSourceIndicator;
     }
 
     public List<List<String>> getPartitioningExprs() {
@@ -50,4 +60,7 @@ public class InternalDetailsDecl implements 
IDatasetDetailsDecl {
         return filterField;
     }
 
+    public Integer getFilterSourceIndicator() {
+        return filterSourceIndicator;
+    }
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj 
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 201ff7d..347dc18 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -835,13 +835,9 @@ DatasetDecl DatasetSpecification(Token startStmtToken) 
throws ParseException:
   ( LOOKAHEAD(2) <WITH> <FILTER> <ON>  filterField = NestedField() )?
   ( <WITH> withRecord = RecordConstructor() )?
   {
-    if(filterField!=null && filterField.first!=0){
-      throw new SqlppParseException(getSourceLocation(startStmtToken),
-        "A filter field can only be a field in the main record of the 
dataset.");
-    }
-    InternalDetailsDecl idd = new InternalDetailsDecl(primaryKeyFields.second, 
primaryKeyFields.first, autogenerated,
-      filterField == null? null : filterField.second);
     try {
+      InternalDetailsDecl idd = new 
InternalDetailsDecl(primaryKeyFields.second, primaryKeyFields.first, 
autogenerated,
+        filterField == null? null : filterField.first, filterField == null? 
null : filterField.second);
       stmt = new DatasetDecl(nameComponents.first, nameComponents.second, 
typeExpr, metaTypeExpr, hints,
         DatasetType.INTERNAL, idd, withRecord, ifNotExists);
       return addSourceLocation(stmt, startStmtToken);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 4a64b8a..cf089fa 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -211,7 +211,7 @@ public class MetadataBootstrap {
         for (int i = 0; i < indexes.length; i++) {
             IDatasetDetails id = new 
InternalDatasetDetails(FileStructure.BTREE, PartitioningStrategy.HASH,
                     indexes[i].getPartitioningExpr(), 
indexes[i].getPartitioningExpr(), null,
-                    indexes[i].getPartitioningExprType(), false, null);
+                    indexes[i].getPartitioningExprType(), false, null, null);
             MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                     new Dataset(indexes[i].getDataverseName(), 
indexes[i].getIndexedDatasetName(),
                             indexes[i].getDataverseName(), 
indexes[i].getPayloadRecordType().getTypeName(),
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 4c19b61..4e721cb 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -965,18 +965,21 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
         // set the record permutation
         fieldPermutation[i++] = inputSchema.findVariable(payload);
-        // set the filters' permutations.
-        if (numFilterFields > 0) {
-            int idx = inputSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[i++] = idx;
-        }
 
+        // set the meta record permutation
         if (additionalNonFilterFields != null) {
             for (LogicalVariable var : additionalNonFilterFields) {
                 int idx = inputSchema.findVariable(var);
                 fieldPermutation[i++] = idx;
             }
         }
+
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = inputSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
+        }
+
         return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, 
fieldPermutation,
                 context.getMissingWriterFactory());
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e66057a..87c7d87 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -464,9 +464,9 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
     public IResourceFactory getResourceFactory(MetadataProvider mdProvider, 
Index index, ARecordType recordType,
             ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, 
Map<String, String> mergePolicyProperties)
             throws AlgebricksException {
-        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(this, recordType);
+        ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(this, recordType, metaType);
         IBinaryComparatorFactory[] filterCmpFactories = 
DatasetUtil.computeFilterBinaryComparatorFactories(this,
-                recordType, 
mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
+                recordType, metaType, 
mdProvider.getStorageComponentProvider().getComparatorFactoryProvider());
         IResourceFactory resourceFactory;
         switch (index.getIndexType()) {
             case BTREE:
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index e4f8948..c82b86a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entities;
 import java.io.DataOutput;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
@@ -61,15 +62,18 @@ public class InternalDatasetDetails implements 
IDatasetDetails {
     private final List<List<String>> primaryKeys;
     private final List<IAType> primaryKeyTypes;
     private final boolean autogenerated;
+    private final Integer filterSourceIndicator;
     private final List<String> filterField;
     private final List<Integer> keySourceIndicators;
 
     public static final String FILTER_FIELD_NAME = "FilterField";
+    public static final String FILTER_SOURCE_INDICATOR_FIELD_NAME = 
"FilterSourceIndicator";
     public static final String KEY_FILD_SOURCE_INDICATOR_FIELD_NAME = 
"KeySourceIndicator";
 
     public InternalDatasetDetails(FileStructure fileStructure, 
PartitioningStrategy partitioningStrategy,
             List<List<String>> partitioningKey, List<List<String>> primaryKey, 
List<Integer> keyFieldIndicators,
-            List<IAType> primaryKeyType, boolean autogenerated, List<String> 
filterField) {
+            List<IAType> primaryKeyType, boolean autogenerated, Integer 
filterSourceIndicator,
+            List<String> filterField) {
         this.fileStructure = fileStructure;
         this.partitioningStrategy = partitioningStrategy;
         this.partitioningKeys = partitioningKey;
@@ -84,7 +88,14 @@ public class InternalDatasetDetails implements 
IDatasetDetails {
         this.keySourceIndicators = keyFieldIndicators;
         this.primaryKeyTypes = primaryKeyType;
         this.autogenerated = autogenerated;
-        this.filterField = filterField;
+        if (filterSourceIndicator != null) {
+            // to make sure filter source indicator and filter field is 
consistent
+            this.filterSourceIndicator = filterSourceIndicator;
+            this.filterField = Objects.requireNonNull(filterField);
+        } else {
+            this.filterSourceIndicator = null;
+            this.filterField = null;
+        }
     }
 
     public List<List<String>> getPartitioningKey() {
@@ -119,6 +130,10 @@ public class InternalDatasetDetails implements 
IDatasetDetails {
         return filterField;
     }
 
+    public Integer getFilterSourceIndicator() {
+        return filterSourceIndicator;
+    }
+
     @Override
     public DatasetType getDatasetType() {
         return DatasetType.INTERNAL;
@@ -209,10 +224,21 @@ public class InternalDatasetDetails implements 
IDatasetDetails {
                 fieldValue);
 
         // write filter fields if any
+        Integer filterSourceIndicator = getFilterSourceIndicator();
         List<String> filterField = getFilterField();
         if (filterField != null) {
-            listBuilder.reset(heterogeneousList);
             ArrayBackedValueStorage nameValue = new ArrayBackedValueStorage();
+            // write filter source indicator
+            nameValue.reset();
+            aString.setValue(FILTER_SOURCE_INDICATOR_FIELD_NAME);
+            stringSerde.serialize(aString, nameValue.getDataOutput());
+            fieldValue.reset();
+            aInt8.setValue(filterSourceIndicator.byteValue());
+            int8Serde.serialize(aInt8, fieldValue.getDataOutput());
+            internalRecordBuilder.addField(nameValue, fieldValue);
+
+            // write filter fields
+            listBuilder.reset(heterogeneousList);
             nameValue.reset();
             aString.setValue(FILTER_FIELD_NAME);
             stringSerde.serialize(aString, nameValue.getDataOutput());
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index f0567a4..e4069c2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -151,11 +151,25 @@ public class DatasetTupleTranslator extends 
AbstractTupleTranslator<Dataset> {
                         
.getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_AUTOGENERATED_FIELD_INDEX))
                                 .getBoolean();
 
+                // check if there is a filter source indicator
+                Integer filterSourceIndicator = null;
+                int filterSourceIndicatorPos = datasetDetailsRecord.getType()
+                        
.getFieldIndex(InternalDatasetDetails.FILTER_SOURCE_INDICATOR_FIELD_NAME);
+                if (filterSourceIndicatorPos >= 0) {
+                    filterSourceIndicator =
+                            (int) ((AInt8) 
datasetDetailsRecord.getValueByPos(filterSourceIndicatorPos)).getByteValue();
+                }
+
                 // Check if there is a filter field.
                 List<String> filterField = null;
                 int filterFieldPos =
                         
datasetDetailsRecord.getType().getFieldIndex(InternalDatasetDetails.FILTER_FIELD_NAME);
                 if (filterFieldPos >= 0) {
+                    // backward compatibility, if a dataset contains filter 
field but no filter source indicator
+                    // we set the indicator to 0 by default.
+                    if (filterSourceIndicator == null) {
+                        filterSourceIndicator = 0;
+                    }
                     filterField = new ArrayList<>();
                     cursor = ((AOrderedList) 
datasetDetailsRecord.getValueByPos(filterFieldPos)).getCursor();
                     while (cursor.next()) {
@@ -180,7 +194,8 @@ public class DatasetTupleTranslator extends 
AbstractTupleTranslator<Dataset> {
                 }
 
                 datasetDetails = new InternalDatasetDetails(fileStructure, 
partitioningStrategy, partitioningKey,
-                        partitioningKey, keyFieldSourceIndicator, 
partitioningKeyType, autogenerated, filterField);
+                        partitioningKey, keyFieldSourceIndicator, 
partitioningKeyType, autogenerated,
+                        filterSourceIndicator, filterField);
                 break;
             }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 3b312bb..762b9e5 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -103,35 +103,43 @@ public class DatasetUtil {
     private DatasetUtil() {
     }
 
+    public static Integer getFilterSourceIndicator(Dataset dataset) {
+        return ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterSourceIndicator();
+    }
+
     public static List<String> getFilterField(Dataset dataset) {
         return ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterField();
     }
 
     public static IBinaryComparatorFactory[] 
computeFilterBinaryComparatorFactories(Dataset dataset,
-            ARecordType itemType, IBinaryComparatorFactoryProvider 
comparatorFactoryProvider)
+            ARecordType recordType, ARecordType metaType, 
IBinaryComparatorFactoryProvider comparatorFactoryProvider)
             throws AlgebricksException {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return null;
         }
+        Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
         List<String> filterField = getFilterField(dataset);
         if (filterField == null) {
             return null;
         }
         IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
+        ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : 
metaType;
         IAType type = itemType.getSubFieldType(filterField);
         bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, 
true);
         return bcfs;
     }
 
-    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, 
ARecordType itemType)
+    public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, 
ARecordType recordType, ARecordType metaType)
             throws AlgebricksException {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return null;
         }
+        Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
         List<String> filterField = getFilterField(dataset);
         if (filterField == null) {
             return null;
         }
+        ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : 
metaType;
         ITypeTraits[] typeTraits = new ITypeTraits[1];
         IAType type = itemType.getSubFieldType(filterField);
         typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
@@ -151,7 +159,8 @@ public class DatasetUtil {
         int numKeys = partitioningKeys.size();
 
         int[] filterFields = new int[1];
-        filterFields[0] = numKeys + 1;
+        int valueFields = dataset.hasMetaPart() ? 2 : 1;
+        filterFields[0] = numKeys + valueFields;
         return filterFields;
     }
 
@@ -471,9 +480,13 @@ public class DatasetUtil {
         }
         // add the previous filter third
         int fieldIdx = -1;
+        Integer filterSourceIndicator = null;
+        ARecordType filterItemType = null;
         if (numFilterFields > 0) {
+            filterSourceIndicator = 
DatasetUtil.getFilterSourceIndicator(dataset);
             String filterField = DatasetUtil.getFilterField(dataset).get(0);
-            String[] fieldNames = itemType.getFieldNames();
+            filterItemType = filterSourceIndicator == 0 ? itemType : 
metaItemType;
+            String[] fieldNames = filterItemType.getFieldNames();
             int i = 0;
             for (; i < fieldNames.length; i++) {
                 if (fieldNames[i].equals(filterField)) {
@@ -481,9 +494,10 @@ public class DatasetUtil {
                 }
             }
             fieldIdx = i;
-            outputTypeTraits[f] = 
dataFormat.getTypeTraitProvider().getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+            outputTypeTraits[f] =
+                    
dataFormat.getTypeTraitProvider().getTypeTrait(filterItemType.getFieldTypes()[fieldIdx]);
             outputSerDes[f] =
-                    
dataFormat.getSerdeProvider().getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                    
dataFormat.getSerdeProvider().getSerializerDeserializer(filterItemType.getFieldTypes()[fieldIdx]);
             f++;
         }
         for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
@@ -493,7 +507,8 @@ public class DatasetUtil {
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, 
outputTypeTraits);
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, 
fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, 
searchCallbackFactory,
-                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, 
itemType, fieldIdx, hasSecondaries);
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, 
filterSourceIndicator, filterItemType,
+                fieldIdx, hasSecondaries);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index f47786d..0cff284 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -27,6 +27,7 @@ import 
org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
@@ -279,7 +280,13 @@ public class SecondaryBTreeOperationsHelper extends 
SecondaryTreeIndexOperations
             secondaryFieldAccessEvalFactories[numSecondaryKeys] = 
metadataProvider.getDataFormat()
                     
.getFieldAccessEvaluatorFactory(metadataProvider.getFunctionManager(), 
itemType, filterFieldName,
                             numPrimaryKeys, sourceLoc);
-            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            Pair<IAType, Boolean> keyTypePair;
+            // since filter is not null, it's safe to cast to internal
+            if (((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterSourceIndicator() == 0) {
+                keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            } else {
+                keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, metaType);
+            }
             IAType type = keyTypePair.first;
             ISerializerDeserializer serde = 
serdeProvider.getSerializerDeserializer(type);
             secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 3930563..01e1af2 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -24,6 +24,7 @@ import 
org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryIndexBulkLoadOperatorDescriptor;
@@ -188,7 +189,13 @@ public class SecondaryCorrelatedBTreeOperationsHelper 
extends SecondaryCorrelate
             secondaryFieldAccessEvalFactories[numSecondaryKeys] =
                     
metadataProvider.getDataFormat().getFieldAccessEvaluatorFactory(
                             metadataProvider.getFunctionManager(), itemType, 
filterFieldName, recordColumn, sourceLoc);
-            Pair<IAType, Boolean> keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            Pair<IAType, Boolean> keyTypePair;
+            // since filter is not null, it's safe to cast to internal
+            if (((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterSourceIndicator() == 0) {
+                keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, itemType);
+            } else {
+                keyTypePair = 
Index.getNonNullableKeyFieldType(filterFieldName, metaType);
+            }
             IAType type = keyTypePair.first;
             ISerializerDeserializer serde = 
serdeProvider.getSerializerDeserializer(type);
             secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 6bcc039..fd45ff4 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -235,7 +235,9 @@ public abstract class SecondaryIndexOperationsHelper {
             secondaryBTreeFields[i] = i;
         }
 
-        IAType type = itemType.getSubFieldType(filterFieldName);
+        IAType type = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getFilterSourceIndicator() == 0
+                ? itemType.getSubFieldType(filterFieldName, itemType)
+                : metaType.getSubFieldType(filterFieldName, metaType);
         filterCmpFactories[0] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true);
         filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
         secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys;
diff --git 
a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
 
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
index 58f03af..73eeae4 100644
--- 
a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
+++ 
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslatorTest.java
@@ -50,7 +50,7 @@ public class DatasetTupleTranslatorTest {
                     
Collections.singletonList(Collections.singletonList("row_id")),
                     
Collections.singletonList(Collections.singletonList("row_id")),
                     indicator == null ? null : 
Collections.singletonList(indicator),
-                    Collections.singletonList(BuiltinType.AINT64), false, 
Collections.emptyList());
+                    Collections.singletonList(BuiltinType.AINT64), false, 
null, null);
 
             Dataset dataset = new 
Dataset(DataverseName.createSinglePartName("test"), "log",
                     DataverseName.createSinglePartName("foo"), "LogType", 
DataverseName.createSinglePartName("CB"),
diff --git 
a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
 
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
index cc02c49..5fcb6e0 100644
--- 
a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
+++ 
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslatorTest.java
@@ -61,7 +61,7 @@ public class IndexTupleTranslatorTest {
                     
Collections.singletonList(Collections.singletonList("row_id")),
                     
Collections.singletonList(Collections.singletonList("row_id")),
                     indicator == null ? null : 
Collections.singletonList(indicator),
-                    Collections.singletonList(BuiltinType.AINT64), false, 
Collections.emptyList());
+                    Collections.singletonList(BuiltinType.AINT64), false, 
null, null);
 
             DataverseName dvTest = DataverseName.createSinglePartName("test");
             DataverseName dvFoo = DataverseName.createSinglePartName("foo");
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 43f54f2..2adad12 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -37,7 +37,8 @@ public class LSMPrimaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
 
     private static final long serialVersionUID = 1L;
     protected final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    protected final ARecordType recordType;
+    protected final Integer filterSourceIndicator;
+    protected final ARecordType filterItemType;
     protected final int filterIndex;
     protected ISearchOperationCallbackFactory searchOpCallbackFactory;
     protected final int numPrimaryKeys;
@@ -49,15 +50,16 @@ public class LSMPrimaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
             IMissingWriterFactory missingWriterFactory,
             IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackFactory,
-            IFrameOperationCallbackFactory frameOpCallbackFactory, int 
numPrimaryKeys, ARecordType recordType,
-            int filterIndex, boolean hasSecondaries) {
+            IFrameOperationCallbackFactory frameOpCallbackFactory, int 
numPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType filterItemType, int filterIndex, boolean 
hasSecondaries) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, 
indexHelperFactory, null, true,
                 modificationOpCallbackFactory);
         this.frameOpCallbackFactory = frameOpCallbackFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numPrimaryKeys = numPrimaryKeys;
         this.missingWriterFactory = missingWriterFactory;
-        this.recordType = recordType;
+        this.filterSourceIndicator = filterSourceIndicator;
+        this.filterItemType = filterItemType;
         this.filterIndex = filterIndex;
         this.hasSecondaries = hasSecondaries;
     }
@@ -67,7 +69,7 @@ public class LSMPrimaryUpsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
         RecordDescriptor intputRecDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMPrimaryUpsertOperatorNodePushable(ctx, partition, 
indexHelperFactory, fieldPermutation,
-                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, 
numPrimaryKeys, recordType, filterIndex,
-                frameOpCallbackFactory, missingWriterFactory, hasSecondaries);
+                intputRecDesc, modCallbackFactory, searchOpCallbackFactory, 
numPrimaryKeys, filterSourceIndicator,
+                filterItemType, filterIndex, frameOpCallbackFactory, 
missingWriterFactory, hasSecondaries);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 316665d..f5bddc5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -96,7 +96,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     protected boolean isFiltered = false;
     private final ArrayTupleReference prevTupleWithFilter = new 
ArrayTupleReference();
     private ArrayTupleBuilder prevRecWithPKWithFilterValue;
-    private ARecordType recordType;
+    private Integer filterSourceIndicator = null;
+    private ARecordType filterItemType;
     private int presetFieldIndex = -1;
     private ARecordPointable recPointable;
     private DataOutput prevDos;
@@ -117,8 +118,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] 
fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, int 
numOfPrimaryKeys, ARecordType recordType,
-            int filterFieldIndex, IFrameOperationCallbackFactory 
frameOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, int 
numOfPrimaryKeys, Integer filterSourceIndicator,
+            ARecordType filterItemType, int filterFieldIndex, 
IFrameOperationCallbackFactory frameOpCallbackFactory,
             IMissingWriterFactory missingWriterFactory, final boolean 
hasSecondaries) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, 
inputRecDesc, IndexOperation.UPSERT,
                 modCallbackFactory, null);
@@ -138,8 +139,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
         this.filterFieldIndex = numOfPrimaryKeys + (hasMeta ? 2 : 1);
         if (filterFieldIndex >= 0) {
             isFiltered = true;
-            this.recordType = recordType;
+            this.filterItemType = filterItemType;
             this.presetFieldIndex = filterFieldIndex;
+            this.filterSourceIndicator = filterSourceIndicator;
             this.recPointable = ARecordPointable.FACTORY.createPointable();
             this.prevRecWithPKWithFilterValue = new 
ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
@@ -393,13 +395,20 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                 prevDos.write(prevTuple.getFieldData(i), 
prevTuple.getFieldStart(i), prevTuple.getFieldLength(i));
                 prevRecWithPKWithFilterValue.addFieldEndOffset();
             }
-            recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), 
prevTuple.getFieldStart(numOfPrimaryKeys),
-                    prevTuple.getFieldLength(numOfPrimaryKeys));
+
+            if (filterSourceIndicator == 0) {
+                recPointable.set(prevTuple.getFieldData(numOfPrimaryKeys), 
prevTuple.getFieldStart(numOfPrimaryKeys),
+                        prevTuple.getFieldLength(numOfPrimaryKeys));
+            } else {
+                recPointable.set(prevTuple.getFieldData(metaFieldIndex), 
prevTuple.getFieldStart(metaFieldIndex),
+                        prevTuple.getFieldLength(metaFieldIndex));
+            }
             // copy the field data from prevTuple
-            byte tag = recPointable.getClosedFieldType(recordType, 
presetFieldIndex).getTypeTag().serialize();
+            byte tag = recPointable.getClosedFieldType(filterItemType, 
presetFieldIndex).getTypeTag().serialize();
             prevDos.write(tag);
-            prevDos.write(recPointable.getByteArray(), 
recPointable.getClosedFieldOffset(recordType, presetFieldIndex),
-                    recPointable.getClosedFieldSize(recordType, 
presetFieldIndex));
+            prevDos.write(recPointable.getByteArray(),
+                    recPointable.getClosedFieldOffset(filterItemType, 
presetFieldIndex),
+                    recPointable.getClosedFieldSize(filterItemType, 
presetFieldIndex));
             prevRecWithPKWithFilterValue.addFieldEndOffset();
             // prepare the tuple
             
prevTupleWithFilter.reset(prevRecWithPKWithFilterValue.getFieldEndOffsets(),

Reply via email to