This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba2d8bd2fc [ASTERIXDB-3144][RT] Pass partitions map to inverted index
ba2d8bd2fc is described below

commit ba2d8bd2fcaf1992177c78dfb326c9ccebd23531
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Apr 5 04:42:09 2023 -0700

    [ASTERIXDB-3144][RT] Pass partitions map to inverted index
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Pass partitions map to the inverted index runtime.
    - rename few methods.
    
    Change-Id: I6ad1b0cd79f0f5e8e15da83330b8a52f9ac0108d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17463
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../operators/physical/BTreeSearchPOperator.java   |   2 +-
 .../operators/physical/InvertedIndexPOperator.java |  19 +-
 .../operators/physical/RTreeSearchPOperator.java   |   2 +-
 .../asterix/app/function/QueryIndexDatasource.java |   6 +-
 .../org/apache/asterix/utils/FeedOperations.java   |   2 +-
 .../metadata/declared/DatasetDataSource.java       |   4 +-
 .../metadata/declared/FunctionDataSource.java      |   4 +-
 .../metadata/declared/LoadableDataSource.java      |   2 +-
 .../metadata/declared/MetadataProvider.java        | 237 ++++++++++-----------
 .../metadata/declared/SampleDataSource.java        |   2 +-
 .../core/algebra/metadata/IMetadataProvider.java   |  31 +--
 .../dataflow/BTreeSearchOperatorDescriptor.java    |   8 +-
 .../dataflow/BTreeSearchOperatorNodePushable.java  |   4 +-
 .../dataflow/IndexSearchOperatorNodePushable.java  |   4 +-
 ...LSMBTreeBatchPointSearchOperatorDescriptor.java |   6 +-
 ...MBTreeBatchPointSearchOperatorNodePushable.java |   4 +-
 .../LSMInvertedIndexSearchOperatorDescriptor.java  |  10 +-
 ...LSMInvertedIndexSearchOperatorNodePushable.java |   6 +-
 18 files changed, 178 insertions(+), 175 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a7a383821b..b8eba74b00 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -162,7 +162,7 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
                         String.valueOf(unnestMap.getOperatorTag()));
         }
 
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = 
metadataProvider.buildBtreeRuntime(
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = 
metadataProvider.getBtreeSearchRuntime(
                 builder.getJobSpec(), opSchema, typeEnv, context, 
jobGenParams.getRetainInput(), retainMissing,
                 nonMatchWriterFactory, dataset, jobGenParams.getIndexName(), 
lowKeyIndexes, highKeyIndexes,
                 jobGenParams.isLowKeyInclusive(), 
jobGenParams.isHighKeyInclusive(), propagateFilter,
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 00eef690e1..5bdb2dba5a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -186,14 +186,17 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
         IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
secondarySplitsAndConstraint.first);
 
-        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new 
LSMInvertedIndexSearchOperatorDescriptor(
-                jobSpec, outputRecDesc, queryField, dataflowHelperFactory, 
queryTokenizerFactory,
-                fullTextConfigEvaluatorFactory, searchModifierFactory, 
retainInput, retainMissing,
-                nonMatchWriterFactory,
-                
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
 secondaryIndex,
-                        IndexOperation.SEARCH, null),
-                minFilterFieldIndexes, maxFilterFieldIndexes, 
isFullTextSearchQuery, numPrimaryKeys,
-                propagateIndexFilter, nonFilterWriterFactory, frameLimit);
+        int numPartitions = 
MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
+        int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
+
+        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+                new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, 
outputRecDesc, queryField, dataflowHelperFactory,
+                        queryTokenizerFactory, fullTextConfigEvaluatorFactory, 
searchModifierFactory, retainInput,
+                        retainMissing, nonMatchWriterFactory,
+                        
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
 secondaryIndex,
+                                IndexOperation.SEARCH, null),
+                        minFilterFieldIndexes, maxFilterFieldIndexes, 
isFullTextSearchQuery, numPrimaryKeys,
+                        propagateIndexFilter, nonFilterWriterFactory, 
frameLimit, partitionsMap);
         return new Pair<>(invIndexSearchOp, 
secondarySplitsAndConstraint.second);
     }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 6534ebe89b..6b5adea536 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -106,7 +106,7 @@ public class RTreeSearchPOperator extends 
IndexSearchPOperator {
         }
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
-                mp.buildRtreeRuntime(builder.getJobSpec(), outputVars, 
opSchema, typeEnv, context,
+                mp.getRtreeSearchRuntime(builder.getJobSpec(), outputVars, 
opSchema, typeEnv, context,
                         jobGenParams.getRetainInput(), retainMissing, 
nonMatchWriterFactory, dataset,
                         jobGenParams.getIndexName(), keyIndexes, 
propagateIndexFilter, nonFilterWriterFactory,
                         minFilterFieldIndexes, maxFilterFieldIndexes, 
unnestMap.getGenerateCallBackProceedResultVar());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index f962142758..fda3845b7b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -105,9 +105,9 @@ public class QueryIndexDatasource extends 
FunctionDataSource {
             IVariableTypeEnvironment typeEnv, JobGenContext context, 
JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo<?> projectionInfo, 
IProjectionFiltrationInfo<?> metaProjectionInfo)
             throws AlgebricksException {
-        return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, 
context, true, false, null, ds, indexName,
-                null, null, true, true, false, null, null, null, 
tupleFilterFactory, outputLimit, false, false,
-                DefaultTupleProjectorFactory.INSTANCE, false);
+        return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, 
typeEnv, context, true, false, null, ds,
+                indexName, null, null, true, true, false, null, null, null, 
tupleFilterFactory, outputLimit, false,
+                false, DefaultTupleProjectorFactory.INSTANCE, false);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0732..07500f710f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -142,7 +142,7 @@ public class FeedOperations {
         IOperatorDescriptor feedIngestor;
         AlgebricksPartitionConstraint ingesterPc;
         Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
ITypedAdapterFactory> t =
-                metadataProvider.buildFeedIntakeRuntime(spec, feed, 
policyAccessor);
+                metadataProvider.getFeedIntakeRuntime(spec, feed, 
policyAccessor);
         feedIngestor = t.first;
         ingesterPc = t.second;
         adapterFactory = t.third;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index c77e032f87..89e99fda08 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -139,7 +139,7 @@ public class DatasetDataSource extends DataSource {
                 properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE, 
String.valueOf(externalScanBufferSize));
                 ITypedAdapterFactory adapterFactory = 
metadataProvider.getConfiguredAdapterFactory(externalDataset,
                         edd.getAdapter(), properties, (ARecordType) itemType, 
null, context.getWarningCollector());
-                return 
metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, 
adapterFactory,
+                return metadataProvider.getExternalDatasetScanRuntime(jobSpec, 
itemType, adapterFactory,
                         tupleFilterFactory, outputLimit);
             case INTERNAL:
                 DataSourceId id = getId();
@@ -163,7 +163,7 @@ public class DatasetDataSource extends DataSource {
 
                 int[] minFilterFieldIndexes = 
createFilterIndexes(minFilterVars, opSchema);
                 int[] maxFilterFieldIndexes = 
createFilterIndexes(maxFilterVars, opSchema);
-                return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, 
typeEnv, context, true, false, null,
+                return metadataProvider.getBtreeSearchRuntime(jobSpec, 
opSchema, typeEnv, context, true, false, null,
                         ((DatasetDataSource) dataSource).getDataset(), 
primaryIndex.getIndexName(), null, null, true,
                         true, false, null, minFilterFieldIndexes, 
maxFilterFieldIndexes, tupleFilterFactory,
                         outputLimit, false, false, tupleProjectorFactory, 
false);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 58377f449c..9f7d567410 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -115,8 +115,8 @@ public abstract class FunctionDataSource extends DataSource 
{
         dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         dataParserFactory.configure(Collections.emptyMap());
         adapterFactory.configure(factory, dataParserFactory);
-        return 
metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, 
adapterFactory,
-                tupleFilterFactory, outputLimit);
+        return metadataProvider.getExternalDatasetScanRuntime(jobSpec, 
itemType, adapterFactory, tupleFilterFactory,
+                outputLimit);
     }
 
     protected abstract IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index c7ccc531d9..fc65c4b7af 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -147,7 +147,7 @@ public class LoadableDataSource extends DataSource {
         ITypedAdapterFactory adapterFactory = 
metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
                 alds.getAdapter(), alds.getAdapterProperties(), itemType, 
null, context.getWarningCollector());
         RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, 
opSchema, context);
-        return metadataProvider.buildLoadableDatasetScan(jobSpec, 
adapterFactory, rDesc);
+        return metadataProvider.getLoadableDatasetScanRuntime(jobSpec, 
adapterFactory, rDesc);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index debf60efb4..486238959f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -188,7 +188,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     private ResultSetId resultSetId;
     private Counter resultSetIdCounter;
     private TxnId txnId;
-    private Map<String, Integer> externalDataLocks;
     private boolean blockingOperatorDisabled = false;
 
     public static MetadataProvider create(ICcApplicationContext appCtx, 
Dataverse defaultDataverse) {
@@ -326,14 +325,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return storageProperties;
     }
 
-    public Map<String, Integer> getExternalDataLocks() {
-        return externalDataLocks;
-    }
-
-    public void setExternalDataLocks(Map<String, Integer> locks) {
-        this.externalDataLocks = locks;
-    }
-
     private DataverseName getActiveDataverseName(DataverseName dataverseName) {
         return dataverseName != null ? dataverseName
                 : defaultDataverse != null ? 
defaultDataverse.getDataverseName() : null;
@@ -496,7 +487,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 context, jobSpec, implConfig, projectionInfo, 
metaProjectionInfo);
     }
 
-    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildLoadableDatasetScan(
+    protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getLoadableDatasetScanRuntime(
             JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, 
RecordDescriptor rDesc)
             throws AlgebricksException {
         ExternalScanOperatorDescriptor dataScanner = new 
ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
@@ -511,7 +502,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
     }
 
-    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
ITypedAdapterFactory> buildFeedIntakeRuntime(
+    public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
ITypedAdapterFactory> getFeedIntakeRuntime(
             JobSpecification jobSpec, Feed feed, FeedPolicyAccessor 
policyAccessor) throws Exception {
         Triple<ITypedAdapterFactory, RecordDescriptor, 
IDataSourceAdapter.AdapterType> factoryOutput;
         factoryOutput =
@@ -539,7 +530,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildBtreeRuntime(JobSpecification jobSpec,
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBtreeSearchRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, 
JobGenContext context, boolean retainInput,
             boolean retainMissing, IMissingWriterFactory 
nonMatchWriterFactory, Dataset dataset, String indexName,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
@@ -633,23 +624,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return new Pair<>(btreeSearchOp, spPc.second);
     }
 
-    public static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
-        if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
-            return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
-        } else {
-            return ((AlgebricksAbsolutePartitionConstraint) 
constraint).getLocations().length;
-        }
-    }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
-
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildRtreeRuntime(JobSpecification jobSpec,
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getRtreeSearchRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing,
             IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, 
String indexName, int[] keyFields,
@@ -808,6 +783,53 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 additionalNonKeyFields, inputRecordDesc, context, spec, false, 
additionalNonFilteringFields);
     }
 
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getUpsertRuntime(
+            IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, 
IVariableTypeEnvironment typeEnv,
+            List<LogicalVariable> primaryKeys, LogicalVariable payload, 
List<LogicalVariable> filterKeys,
+            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor 
recordDesc, JobGenContext context,
+            JobSpecification spec) throws AlgebricksException {
+        DataverseName dataverseName = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, 
datasetName, dataverseName);
+        }
+        int numKeys = primaryKeys.size();
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
+        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : 
additionalNonFilterFields.size();
+        // Move key fields to front. [keys, record, filters]
+        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + 
numOfAdditionalFields];
+        int[] bloomFilterKeyFields = new int[numKeys];
+        int i = 0;
+        // set the keys' permutations
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = inputSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            bloomFilterKeyFields[i] = i;
+            i++;
+        }
+        // set the record permutation
+        fieldPermutation[i++] = inputSchema.findVariable(payload);
+
+        // set the meta record permutation
+        if (additionalNonFilterFields != null) {
+            for (LogicalVariable var : additionalNonFilterFields) {
+                int idx = inputSchema.findVariable(var);
+                fieldPermutation[i++] = idx;
+            }
+        }
+
+        // set the filters' permutations.
+        if (numFilterFields > 0) {
+            int idx = inputSchema.findVariable(filterKeys.get(0));
+            fieldPermutation[i++] = idx;
+        }
+
+        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, 
fieldPermutation,
+                context.getMissingWriterFactory());
+    }
+
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexInsertRuntime(
             IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
@@ -816,9 +838,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec,
             boolean bulkload, List<List<AlgebricksPipeline>> 
secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, 
dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, null, recordDesc,
-                context, spec, bulkload, null, null, null, 
secondaryKeysPipelines, pipelineTopSchema);
+        return getIndexModificationRuntime(IndexOperation.INSERT, 
dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, 
filterExpr, null, recordDesc, context,
+                spec, bulkload, null, null, null, secondaryKeysPipelines, 
pipelineTopSchema);
     }
 
     @Override
@@ -829,9 +851,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, 
IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, 
dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, null, recordDesc,
-                context, spec, false, null, null, null, 
secondaryKeysPipelines, pipelineTopSchema);
+        return getIndexModificationRuntime(IndexOperation.DELETE, 
dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, 
filterExpr, null, recordDesc, context,
+                spec, false, null, null, null, secondaryKeysPipelines, 
pipelineTopSchema);
     }
 
     @Override
@@ -843,9 +865,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKey,
             RecordDescriptor recordDesc, JobGenContext context, 
JobSpecification spec,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws 
AlgebricksException {
-        return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, 
dataSourceIndex, propagatedSchema,
-                inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalFilteringKeys, filterExpr, prevFilterExpr,
-                recordDesc, context, spec, false, operationVar, 
prevSecondaryKeys, prevAdditionalFilteringKey,
+        return getIndexModificationRuntime(IndexOperation.UPSERT, 
dataSourceIndex, propagatedSchema, inputSchemas,
+                typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, 
filterExpr, prevFilterExpr, recordDesc,
+                context, spec, false, operationVar, prevSecondaryKeys, 
prevAdditionalFilteringKey,
                 secondaryKeysPipelines, null);
     }
 
@@ -969,53 +991,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return null;
     }
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getUpsertRuntime(
-            IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, 
IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, LogicalVariable payload, 
List<LogicalVariable> filterKeys,
-            List<LogicalVariable> additionalNonFilterFields, RecordDescriptor 
recordDesc, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        DataverseName dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, 
datasetName, dataverseName);
-        }
-        int numKeys = primaryKeys.size();
-        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
-        int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : 
additionalNonFilterFields.size();
-        // Move key fields to front. [keys, record, filters]
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + 
numOfAdditionalFields];
-        int[] bloomFilterKeyFields = new int[numKeys];
-        int i = 0;
-        // set the keys' permutations
-        for (LogicalVariable varKey : primaryKeys) {
-            int idx = inputSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            bloomFilterKeyFields[i] = i;
-            i++;
-        }
-        // set the record permutation
-        fieldPermutation[i++] = inputSchema.findVariable(payload);
-
-        // set the meta record permutation
-        if (additionalNonFilterFields != null) {
-            for (LogicalVariable var : additionalNonFilterFields) {
-                int idx = inputSchema.findVariable(var);
-                fieldPermutation[i++] = idx;
-            }
-        }
-
-        // set the filters' permutations.
-        if (numFilterFields > 0) {
-            int idx = inputSchema.findVariable(filterKeys.get(0));
-            fieldPermutation[i++] = idx;
-        }
-
-        return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, 
fieldPermutation,
-                context.getMissingWriterFactory());
-    }
-
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
createPrimaryIndexUpsertOp(JobSpecification spec,
             MetadataProvider metadataProvider, Dataset dataset, 
RecordDescriptor inputRecordDesc,
             int[] fieldPermutation, IMissingWriterFactory 
missingWriterFactory) throws AlgebricksException {
@@ -1024,7 +999,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 missingWriterFactory);
     }
 
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDatasetDataScannerRuntime(
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getExternalDatasetScanRuntime(
             JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory 
adapterFactory,
             ITupleFilterFactory tupleFilterFactory, long outputLimit) throws 
AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1162,13 +1137,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 tupleFilterFactory, isPrimary, modCallbackFactory, 
tuplePartitionerFactory, partitionsMap);
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexInsertOrDeleteOrUpsertRuntime(
-            IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> 
dataSourceIndex,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, 
IVariableTypeEnvironment typeEnv,
-            List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, ILogicalExpression 
filterExpr,
-            ILogicalExpression prevFilterExpr, RecordDescriptor 
inputRecordDesc, JobGenContext context,
-            JobSpecification spec, boolean bulkload, LogicalVariable 
operationVar,
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexModificationRuntime(IndexOperation indexOp,
+            IDataSourceIndex<String, DataSourceId> dataSourceIndex, 
IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
+            ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr, 
RecordDescriptor inputRecordDesc,
+            JobGenContext context, JobSpecification spec, boolean bulkload, 
LogicalVariable operationVar,
             List<LogicalVariable> prevSecondaryKeys, LogicalVariable 
prevAdditionalFilteringKey,
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, 
IOperatorSchema pipelineTopSchema)
             throws AlgebricksException {
@@ -1202,33 +1176,33 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         switch (secondaryIndex.getIndexType()) {
             case BTREE:
-                return getBTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
+                return getBTreeModificationRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, 
prevFilterFactory, inputRecordDesc,
                         context, spec, indexOp, bulkload, operationVar, 
prevSecondaryKeys, prevAdditionalFilteringKeys);
             case ARRAY:
                 if (bulkload) {
                     // In the case of bulk-load, we do not handle any nested 
plans. We perform the exact same behavior
                     // as a normal B-Tree bulk load.
-                    return getBTreeRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
-                            secondaryKeys, additionalNonKeyFields, 
filterFactory, prevFilterFactory, inputRecordDesc,
-                            context, spec, indexOp, bulkload, operationVar, 
prevSecondaryKeys,
+                    return getBTreeModificationRuntime(dataverseName, 
datasetName, indexName, propagatedSchema,
+                            primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterFactory, prevFilterFactory,
+                            inputRecordDesc, context, spec, indexOp, bulkload, 
operationVar, prevSecondaryKeys,
                             prevAdditionalFilteringKeys);
                 } else {
-                    return getArrayIndexRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
-                            additionalNonKeyFields, inputRecordDesc, spec, 
indexOp, operationVar,
+                    return getArrayIndexModificationRuntime(dataverseName, 
datasetName, indexName, propagatedSchema,
+                            primaryKeys, additionalNonKeyFields, 
inputRecordDesc, spec, indexOp, operationVar,
                             secondaryKeysPipelines);
                 }
             case RTREE:
-                return getRTreeRuntime(dataverseName, datasetName, indexName, 
propagatedSchema, primaryKeys,
+                return getRTreeModificationRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
                         secondaryKeys, additionalNonKeyFields, filterFactory, 
prevFilterFactory, inputRecordDesc,
                         context, spec, indexOp, bulkload, operationVar, 
prevSecondaryKeys, prevAdditionalFilteringKeys);
             case SINGLE_PARTITION_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case LENGTH_PARTITIONED_WORD_INVIX:
             case LENGTH_PARTITIONED_NGRAM_INVIX:
-                return getInvertedIndexRuntime(dataverseName, datasetName, 
indexName, propagatedSchema, primaryKeys,
-                        secondaryKeys, additionalNonKeyFields, filterFactory, 
prevFilterFactory, inputRecordDesc,
-                        context, spec, indexOp, secondaryIndex.getIndexType(), 
bulkload, operationVar,
+                return getInvertedIndexModificationRuntime(dataverseName, 
datasetName, indexName, propagatedSchema,
+                        primaryKeys, secondaryKeys, additionalNonKeyFields, 
filterFactory, prevFilterFactory,
+                        inputRecordDesc, context, spec, indexOp, 
secondaryIndex.getIndexType(), bulkload, operationVar,
                         prevSecondaryKeys, prevAdditionalFilteringKeys);
             default:
                 throw new AlgebricksException(
@@ -1236,13 +1210,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBTreeRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory 
prevFilterFactory,
-            RecordDescriptor inputRecordDesc, JobGenContext context, 
JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload, LogicalVariable operationVar, 
List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getBTreeModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, 
IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, 
AsterixTupleFilterFactory filterFactory,
+            AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor 
inputRecordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
LogicalVariable operationVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         int numKeys = primaryKeys.size() + secondaryKeys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
@@ -1332,10 +1307,11 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getArrayIndexRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> additionalNonKeyFields, RecordDescriptor 
inputRecordDesc, JobSpecification spec,
-            IndexOperation indexOp, LogicalVariable operationVar, 
List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getArrayIndexModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, 
IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
+            RecordDescriptor inputRecordDesc, JobSpecification spec, 
IndexOperation indexOp,
+            LogicalVariable operationVar, List<List<AlgebricksPipeline>> 
secondaryKeysPipelines)
             throws AlgebricksException {
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
@@ -1397,13 +1373,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getRTreeRuntime(DataverseName dataverseName,
-            String datasetName, String indexName, IOperatorSchema 
propagatedSchema, List<LogicalVariable> primaryKeys,
-            List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
-            AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory 
prevFilterFactory,
-            RecordDescriptor recordDesc, JobGenContext context, 
JobSpecification spec, IndexOperation indexOp,
-            boolean bulkload, LogicalVariable operationVar, 
List<LogicalVariable> prevSecondaryKeys,
-            List<LogicalVariable> prevAdditionalFilteringKeys) throws 
AlgebricksException {
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getRTreeModificationRuntime(
+            DataverseName dataverseName, String datasetName, String indexName, 
IOperatorSchema propagatedSchema,
+            List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
+            List<LogicalVariable> additionalNonKeyFields, 
AsterixTupleFilterFactory filterFactory,
+            AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
+            JobSpecification spec, IndexOperation indexOp, boolean bulkload, 
LogicalVariable operationVar,
+            List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
+            throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = MetadataManager.INSTANCE
@@ -1506,7 +1483,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
-    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInvertedIndexRuntime(
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInvertedIndexModificationRuntime(
             DataverseName dataverseName, String datasetName, String indexName, 
IOperatorSchema propagatedSchema,
             List<LogicalVariable> primaryKeys, List<LogicalVariable> 
secondaryKeys,
             List<LogicalVariable> additionalNonKeyFields, 
AsterixTupleFilterFactory filterFactory,
@@ -1905,6 +1882,22 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         validateDatabaseObjectNameImpl(objectName, sourceLoc);
     }
 
+    public static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
+        if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+            return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
+        } else {
+            return ((AlgebricksAbsolutePartitionConstraint) 
constraint).getLocations().length;
+        }
+    }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
+
     private void validateDatabaseObjectNameImpl(String name, SourceLocation 
sourceLoc) throws AlgebricksException {
         if (name == null || name.isEmpty()) {
             throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, 
sourceLoc, "");
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 708c2c2047..448f5ce522 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -62,7 +62,7 @@ public class SampleDataSource extends DataSource {
             IVariableTypeEnvironment typeEnv, JobGenContext context, 
JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo<?> projectionInfo, 
IProjectionFiltrationInfo<?> metaProjectionInfo)
             throws AlgebricksException {
-        return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, 
context, true, false, null, dataset,
+        return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema, 
typeEnv, context, true, false, null, dataset,
                 sampleIndexName, null, null, true, true, false, null, null, 
null, tupleFilterFactory, outputLimit,
                 false, false, DefaultTupleProjectorFactory.INSTANCE, false);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4596393b25..4a6e76e16b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 
 public interface IMetadataProvider<S, I> {
-    IDataSource<S> findDataSource(S id) throws AlgebricksException;
 
     /**
      * Obs: A scanner may choose to contribute a null
@@ -78,6 +77,12 @@ public interface IMetadataProvider<S, I> {
             List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor inputRecordDesc, JobGenContext context,
             JobSpecification jobSpec, boolean bulkload) throws 
AlgebricksException;
 
+    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getUpsertRuntime(IDataSource<S> dataSource,
+            IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> 
additionalFilterFields,
+            List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor recordDesc, JobGenContext context,
+            JobSpecification jobSpec) throws AlgebricksException;
+
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getDeleteRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment 
typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> 
additionalNonKeyFields,
@@ -115,6 +120,14 @@ public interface IMetadataProvider<S, I> {
             List<List<AlgebricksPipeline>> secondaryKeysPipelines, 
IOperatorSchema pipelineTopSchema)
             throws AlgebricksException;
 
+    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexUpsertRuntime(
+            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema 
propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> 
primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression 
filterExpr,
+            ILogicalExpression prevFilterExpr, LogicalVariable operationVar, 
List<LogicalVariable> prevSecondaryKeys,
+            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor 
inputDesc, JobGenContext context,
+            JobSpecification spec, List<List<AlgebricksPipeline>> 
secondaryKeysPipelines) throws AlgebricksException;
+
     /**
      * Creates the delete runtime of IndexInsertDeletePOperator, which models
      * insert/delete operations into a secondary index.
@@ -170,24 +183,12 @@ public interface IMetadataProvider<S, I> {
             RecordDescriptor recordDesc, JobGenContext context, 
JobSpecification spec, boolean bulkload)
             throws AlgebricksException;
 
+    IDataSource<S> findDataSource(S id) throws AlgebricksException;
+
     IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) 
throws AlgebricksException;
 
     IFunctionInfo lookupFunction(FunctionIdentifier fid);
 
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getUpsertRuntime(IDataSource<S> dataSource,
-            IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> keys,
-            LogicalVariable payLoadVar, List<LogicalVariable> 
additionalFilterFields,
-            List<LogicalVariable> additionalNonFilteringFields, 
RecordDescriptor recordDesc, JobGenContext context,
-            JobSpecification jobSpec) throws AlgebricksException;
-
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexUpsertRuntime(
-            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema 
propagatedSchema, IOperatorSchema[] inputSchemas,
-            IVariableTypeEnvironment typeEnv, List<LogicalVariable> 
primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression 
filterExpr,
-            ILogicalExpression prevFilterExpr, LogicalVariable operationVar, 
List<LogicalVariable> prevSecondaryKeys,
-            LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor 
inputDesc, JobGenContext context,
-            JobSpecification spec, List<List<AlgebricksPipeline>> 
secondaryKeysPipelines) throws AlgebricksException;
-
     ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[] 
inputSchemas, IVariableTypeEnvironment typeEnv,
             ILogicalExpression filterExpr, JobGenContext context) throws 
AlgebricksException;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 1c961c5c68..c46391f5c6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -57,7 +57,7 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
     protected final long outputLimit;
     protected final ITupleProjectorFactory tupleProjectorFactory;
     protected final ITuplePartitionerFactory tuplePartitionerFactory;
-    protected final int[][] map;
+    protected final int[][] partitionsMap;
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
@@ -79,7 +79,7 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -102,7 +102,7 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
         this.searchCallbackProceedResultTrueValue = 
searchCallbackProceedResultTrueValue;
         this.tupleProjectorFactory = tupleProjectorFactory;
         this.tuplePartitionerFactory = tuplePartitionerFactory;
-        this.map = map;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -114,7 +114,7 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult,
                 searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, tupleProjectorFactory,
-                tuplePartitionerFactory, map);
+                tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 24163ea9d9..3fd0cf96a2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -68,12 +68,12 @@ public class BTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult,
                 searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, projectorFactory,
-                tuplePartitionerFactory, map);
+                tuplePartitionerFactory, partitionsMap);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         if (lowKeyFields != null && lowKeyFields.length > 0) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c6cdd663b1..88c06eb91a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -128,10 +128,10 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendSearchCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
             byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
         this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-        this.partitions = map != null ? map[partition] : new int[] { partition 
};
+        this.partitions = partitionsMap != null ? partitionsMap[partition] : 
new int[] { partition };
         for (int i = 0; i < partitions.length; i++) {
             storagePartitionId2Index.put(partitions[i], i);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 9ed0782a89..4d8f1fe7af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -41,11 +41,11 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor 
extends BTreeSearchOpera
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, 
ITupleFilterFactory tupleFilterFactory,
             long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) {
         super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, 
highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
                 maxFilterFieldIndexes, false, null, tupleFilterFactory, 
outputLimit, false, null, null,
-                tupleProjectorFactory, tuplePartitionerFactory, map);
+                tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
@@ -55,7 +55,7 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor 
extends BTreeSearchOpera
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), lowKeyFields, highKeyFields,
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, tupleFilterFactory,
-                outputLimit, tupleProjectorFactory, tuplePartitionerFactory, 
map);
+                outputLimit, tupleProjectorFactory, tuplePartitionerFactory, 
partitionsMap);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 47d515a3e3..8c8a550627 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -54,11 +54,11 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable 
extends BTreeSearchOpe
             IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
             ITupleFilterFactory tupleFilterFactory, long outputLimit, 
ITupleProjectorFactory tupleProjectorFactory,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) throws HyracksDataException {
         super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, 
lowKeyInclusive, highKeyInclusive,
                 minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, 
retainInput, retainMissing,
                 missingWriterFactory, searchCallbackFactory, false, null, 
tupleFilterFactory, outputLimit, false, null,
-                null, tupleProjectorFactory, tuplePartitionerFactory, map);
+                null, tupleProjectorFactory, tuplePartitionerFactory, 
partitionsMap);
         this.keyFields = lowKeyFields;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
index b5b951da0e..fcdf792c23 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
@@ -35,7 +35,7 @@ import 
org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigE
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 
 public class LSMInvertedIndexSearchOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final int queryField;
     private final IInvertedIndexSearchModifierFactory searchModifierFactory;
@@ -54,6 +54,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends 
AbstractSingleActi
     private final int numOfFields;
     // the maximum number of frames that this inverted-index-search can use
     private final int frameLimit;
+    private final int[][] partitionsMap;
 
     public 
LSMInvertedIndexSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             int queryField, IIndexDataflowHelperFactory indexHelperFactory,
@@ -62,7 +63,8 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends 
AbstractSingleActi
             IInvertedIndexSearchModifierFactory searchModifierFactory, boolean 
retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean 
isFullTextSearchQuery, int numOfFields,
-            boolean appendIndexFilter, IMissingWriterFactory 
nonFilterWriterFactory, int frameLimit) {
+            boolean appendIndexFilter, IMissingWriterFactory 
nonFilterWriterFactory, int frameLimit,
+            int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.queryTokenizerFactory = queryTokenizerFactory;
@@ -79,6 +81,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends 
AbstractSingleActi
         this.appendIndexFilter = appendIndexFilter;
         this.nonFilterWriterFactory = nonFilterWriterFactory;
         this.numOfFields = numOfFields;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
         this.frameLimit = frameLimit;
     }
@@ -91,6 +94,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends 
AbstractSingleActi
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), partition, minFilterFieldIndexes,
                 maxFilterFieldIndexes, indexHelperFactory, retainInput, 
retainMissing, missingWriterFactory,
                 searchCallbackFactory, searchModifier, queryTokenizerFactory, 
fullTextConfigEvaluatorFactory,
-                queryField, isFullTextSearchQuery, numOfFields, 
appendIndexFilter, nonFilterWriterFactory, frameLimit);
+                queryField, isFullTextSearchQuery, numOfFields, 
appendIndexFilter, nonFilterWriterFactory, frameLimit,
+                partitionsMap);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 996241daef..742a86c14a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -65,10 +65,12 @@ public class LSMInvertedIndexSearchOperatorNodePushable 
extends IndexSearchOpera
             IInvertedIndexSearchModifier searchModifier, 
IBinaryTokenizerFactory binaryTokenizerFactory,
             IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory, 
int queryFieldIndex,
             boolean isFullTextSearchQuery, int numOfFields, boolean 
appendIndexFilter,
-            IMissingWriterFactory nonFilterWriterFactory, int frameLimit) 
throws HyracksDataException {
+            IMissingWriterFactory nonFilterWriterFactory, int frameLimit, 
int[][] partitionsMap)
+            throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory, null, -1, false, null, null, 
DefaultTupleProjectorFactory.INSTANCE, null, null);
+                nonFilterWriterFactory, null, -1, false, null, null, 
DefaultTupleProjectorFactory.INSTANCE, null,
+                partitionsMap);
         this.searchModifier = searchModifier;
         this.binaryTokenizerFactory = binaryTokenizerFactory;
         this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;

Reply via email to