This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f4cdc2a9 [ASTERIXDB-3144][RT] Introduce DataPartitioningProvider
09f4cdc2a9 is described below

commit 09f4cdc2a9c879169808380bf64f4bda1af89390
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Wed May 3 01:30:41 2023 +0300

    [ASTERIXDB-3144][RT] Introduce DataPartitioningProvider
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add storage partitioning scheme config (dyanmic or static) and
      default it to dynamic.
    - Introduce DataPartitioningProvider which encapsulates the logic
      for dataset partitioning based on the partitioning scheme.
    
    Change-Id: Ia2bbc716fb4c2e9abca06e8f8629b15bd48bc7f3
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17503
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
    Tested-by: Murtadha Hubail <mhub...@apache.org>
---
 .../operators/physical/BTreeSearchPOperator.java   |   8 +-
 .../operators/physical/InvertedIndexPOperator.java |  19 +-
 .../rules/am/IntroduceSelectAccessMethodRule.java  |   7 +-
 .../asterix/app/cc/CcApplicationContext.java       |   9 +
 .../org/apache/asterix/utils/DataverseUtil.java    |  13 +-
 .../org/apache/asterix/utils/FlushDatasetUtil.java |   9 +-
 .../asterix/app/bootstrap/TestNodeController.java  |  16 +-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   1 +
 .../cluster_state_1_full.1.regexadm                |   1 +
 .../cluster_state_1_less.1.regexadm                |   1 +
 .../common/cluster/PartitioningProperties.java     |  56 +++++
 .../asterix/common/config/StorageProperties.java   |  12 +-
 .../common/dataflow/ICcApplicationContext.java     |   7 +
 .../common/dataflow/IDataPartitioningProvider.java |  22 ++
 .../asterix/common/utils/PartitioningScheme.java   |  52 +++++
 .../declared/DataSourcePartitioningProvider.java   |  12 +-
 .../metadata/declared/MetadataProvider.java        | 232 +++++++++------------
 .../metadata/utils/DataPartitioningProvider.java   |  93 +++++++++
 .../apache/asterix/metadata/utils/DatasetUtil.java |  79 ++++---
 .../metadata/utils/SampleOperationsHelper.java     |  25 ++-
 .../utils/SecondaryIndexOperationsHelper.java      |  33 ++-
 .../utils/SecondaryTreeIndexOperationsHelper.java  |  35 ++--
 .../tests/am/btree/AbstractBTreeOperatorTest.java  |   3 +-
 .../tests/am/rtree/AbstractRTreeOperatorTest.java  |   3 +-
 24 files changed, 483 insertions(+), 265 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 8865bb2241..6c6b2aa233 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -21,6 +21,7 @@ package org.apache.asterix.algebra.operators.physical;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -241,10 +242,9 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
                     propsLocal.add(new LocalOrderProperty(orderColumns));
                     MetadataProvider mp = (MetadataProvider) 
context.getMetadataProvider();
                     Dataset dataset = 
mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
-                    int[][] partitionsMap = mp.getPartitionsMap(dataset);
-                    pv[0] = new StructuralPropertiesVector(
-                            
UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, 
partitionsMap),
-                            propsLocal);
+                    PartitioningProperties partitioningProperties = 
mp.getPartitioningProperties(dataset);
+                    pv[0] = new 
StructuralPropertiesVector(UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars,
+                            domain, 
partitioningProperties.getComputeStorageMap()), propsLocal);
                     return new PhysicalRequirements(pv, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
                 }
             }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 5bdb2dba5a..20334bf277 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -20,6 +20,7 @@ package org.apache.asterix.algebra.operators.physical;
 
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSourceId;
@@ -58,7 +59,6 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -171,8 +171,7 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
         }
         IVariableTypeEnvironment typeEnv = 
context.getTypeEnvironment(unnestMap);
         RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
indexName);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset, indexName);
         // TODO: Here we assume there is only one search key field.
         int queryField = keyFields[0];
         // Get tokenizer and search modifier factories.
@@ -183,12 +182,9 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
         IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory =
                 
FullTextUtil.fetchFilterAndCreateConfigEvaluator(metadataProvider, 
secondaryIndex.getDataverseName(),
                         ((Index.TextIndexDetails) 
secondaryIndex.getIndexDetails()).getFullTextConfigName());
-        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
secondarySplitsAndConstraint.first);
-
-        int numPartitions = 
MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
-        int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
-
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        partitioningProperties.getSpiltsProvider());
         LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
                 new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, 
outputRecDesc, queryField, dataflowHelperFactory,
                         queryTokenizerFactory, fullTextConfigEvaluatorFactory, 
searchModifierFactory, retainInput,
@@ -196,7 +192,8 @@ public class InvertedIndexPOperator extends 
IndexSearchPOperator {
                         
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
 secondaryIndex,
                                 IndexOperation.SEARCH, null),
                         minFilterFieldIndexes, maxFilterFieldIndexes, 
isFullTextSearchQuery, numPrimaryKeys,
-                        propagateIndexFilter, nonFilterWriterFactory, 
frameLimit, partitionsMap);
-        return new Pair<>(invIndexSearchOp, 
secondarySplitsAndConstraint.second);
+                        propagateIndexFilter, nonFilterWriterFactory, 
frameLimit,
+                        partitioningProperties.getComputeStorageMap());
+        return new Pair<>(invIndexSearchOp, 
partitioningProperties.getConstraints());
     }
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index 24a7fb25b4..7b8567f7aa 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import java.util.TreeMap;
 
 import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -361,9 +362,9 @@ public class IntroduceSelectAccessMethodRule extends 
AbstractIntroduceAccessMeth
             outputVars.add(outputVar);
             VariableUtilities.substituteVariables(lop, inputVar, outputVar, 
context);
         }
-
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
-        IntersectOperator intersect = new IntersectOperator(outputVars, 
inputVars, partitionsMap);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(idx);
+        IntersectOperator intersect =
+                new IntersectOperator(outputVars, inputVars, 
partitioningProperties.getComputeStorageMap());
         intersect.setSourceLocation(lop.getSourceLocation());
         for (ILogicalOperator secondarySearch : subRoots) {
             intersect.getInputs().add(secondarySearch.getInputs().get(0));
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 880880e851..a2d99a0332 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -47,6 +47,7 @@ import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
 import org.apache.asterix.common.external.IAdapterFactoryService;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
@@ -54,6 +55,7 @@ import 
org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.storage.ICompressionManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.metadata.utils.DataPartitioningProvider;
 import org.apache.asterix.runtime.compression.CompressionManager;
 import org.apache.asterix.runtime.job.listener.NodeJobTracker;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
@@ -112,6 +114,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     private final IConfigValidator configValidator;
     private final IAdapterFactoryService adapterFactoryService;
     private final ReentrantReadWriteLock compilationLock = new 
ReentrantReadWriteLock(true);
+    private final IDataPartitioningProvider dataPartitioningProvider;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
HyracksConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
@@ -154,6 +157,7 @@ public class CcApplicationContext implements 
ICcApplicationContext {
         requestTracker = new RequestTracker(this);
         configValidator = configValidatorFactory.create();
         this.adapterFactoryService = adapterFactoryService;
+        dataPartitioningProvider = new DataPartitioningProvider(this);
     }
 
     @Override
@@ -357,4 +361,9 @@ public class CcApplicationContext implements 
ICcApplicationContext {
     public ReentrantReadWriteLock getCompilationLock() {
         return compilationLock;
     }
+
+    @Override
+    public IDataPartitioningProvider getDataPartitioningProvider() {
+        return dataPartitioningProvider;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index f470949a08..d5743cbdf9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -18,15 +18,13 @@
  */
 package org.apache.asterix.utils;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 
 public class DataverseUtil {
 
@@ -35,10 +33,11 @@ public class DataverseUtil {
 
     public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, 
MetadataProvider metadata) {
         JobSpecification jobSpec = 
RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadata.splitAndConstraints(dataverse.getDataverseName());
-        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
frod, splitsAndConstraint.second);
+        PartitioningProperties partitioningProperties = 
metadata.splitAndConstraints(dataverse.getDataverseName());
+        FileRemoveOperatorDescriptor frod =
+                new FileRemoveOperatorDescriptor(jobSpec, 
partitioningProperties.getSpiltsProvider(), false);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
frod,
+                partitioningProperties.getConstraints());
         jobSpec.addRoot(frod);
         return jobSpec;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index ff993276e1..f012a4e56a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.utils;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.transactions.TxnId;
@@ -29,7 +30,6 @@ import 
org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
@@ -38,7 +38,6 @@ import 
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 
 public class FlushDatasetUtil {
     private FlushDatasetUtil() {
@@ -66,9 +65,9 @@ public class FlushDatasetUtil {
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, 
flushOperator, 0);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
dataset.getDatasetName());
-        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
dataset.getDatasetName());
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
partitioningProperties.getConstraints();
 
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
emptySource,
                 primaryPartitionConstraint);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 3ddbfd97a3..682b636f7f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -214,7 +214,7 @@ public class TestNodeController {
                 fieldPermutation[i] = i;
             }
             int numPartitions = 
primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new 
FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, 
numPartitions);
@@ -263,7 +263,7 @@ public class TestNodeController {
             }
 
             int numPartitions = 
primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new 
FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, 
numPartitions);
@@ -372,7 +372,7 @@ public class TestNodeController {
             IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
             int numPartitions = 
primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-            int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
primaryIndexInfo.hashFuncFactories;
             ITuplePartitionerFactory tuplePartitionerFactory = new 
FieldHashPartitionerFactory(
                     primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, 
numPartitions);
@@ -838,7 +838,7 @@ public class TestNodeController {
         IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
         int numPartitions = 
primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
-        int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
+        int[][] partitionsMap = getPartitionsMap(numPartitions);
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
primaryIndexInfo.hashFuncFactories;
         ITuplePartitionerFactory tuplePartitionerFactory =
                 new 
FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, 
pkHashFunFactories, numPartitions);
@@ -912,4 +912,12 @@ public class TestNodeController {
         }
         return new RecordDescriptor(outputSerDes, outputTypeTraits);
     }
+
+    private static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 4e96365008..801cfb09bc 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -54,6 +54,7 @@
     "storage.compression.block" : "snappy",
     "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+    "storage.partitioning" : "dynamic",
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 415e96de01..e54bd70ba9 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -54,6 +54,7 @@
     "storage.compression.block" : "snappy",
     "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+    "storage.partitioning" : "dynamic",
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index add09ca27c..c5496b26ee 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -54,6 +54,7 @@
     "storage.compression.block" : "snappy",
     "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
+    "storage.partitioning" : "dynamic",
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
     "txn\.dataset\.checkpoint\.interval" : 3600,
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java
new file mode 100644
index 0000000000..1580ca4ef8
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class PartitioningProperties {
+    private final IFileSplitProvider splitsProvider;
+    private final AlgebricksPartitionConstraint constraints;
+    private final int[][] computeStorageMap;
+
+    private PartitioningProperties(IFileSplitProvider splitsProvider, 
AlgebricksPartitionConstraint constraints,
+            int[][] computeStorageMap) {
+        this.splitsProvider = splitsProvider;
+        this.constraints = constraints;
+        this.computeStorageMap = computeStorageMap;
+    }
+
+    public static PartitioningProperties of(IFileSplitProvider splitsProvider,
+            AlgebricksPartitionConstraint constraints, int[][] 
computeStorageMap) {
+        return new PartitioningProperties(splitsProvider, constraints, 
computeStorageMap);
+    }
+
+    public IFileSplitProvider getSpiltsProvider() {
+        return splitsProvider;
+    }
+
+    public AlgebricksPartitionConstraint getConstraints() {
+        return constraints;
+    }
+
+    public int[][] getComputeStorageMap() {
+        return computeStorageMap;
+    }
+
+    public int getNumberOfPartitions() {
+        return splitsProvider.getFileSplits().length;
+    }
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 073da971f7..40bcfb01c7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
@@ -64,7 +65,8 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) 
TimeUnit.MINUTES.toSeconds(10)),
         STORAGE_COLUMN_MAX_TUPLE_COUNT(NONNEGATIVE_INTEGER, 15000),
         STORAGE_COLUMN_FREE_SPACE_TOLERANCE(DOUBLE, 0.15),
-        STORAGE_FORMAT(STRING, "row");
+        STORAGE_FORMAT(STRING, "row"),
+        STORAGE_PARTITIONING(STRING, "dynamic");
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -81,6 +83,7 @@ public class StorageProperties extends AbstractProperties {
                 case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE:
                 case STORAGE_GLOBAL_CLEANUP:
                 case STORAGE_GLOBAL_CLEANUP_TIMEOUT:
+                case STORAGE_PARTITIONING:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -139,6 +142,9 @@ public class StorageProperties extends AbstractProperties {
                             + " 0.15 means a physical page with 15% or less 
empty space is tolerable)";
                 case STORAGE_FORMAT:
                     return "The default storage format (either row or column)";
+                case STORAGE_PARTITIONING:
+                    return "The storage partitioning scheme (either dynamic or 
static). This value should not be changed"
+                            + " after any dataset have been created";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -287,4 +293,8 @@ public class StorageProperties extends AbstractProperties {
     public String getStorageFormat() {
         return accessor.getString(Option.STORAGE_FORMAT);
     }
+
+    public PartitioningScheme getPartitioningScheme() {
+        return 
PartitioningScheme.fromName(accessor.getString(Option.STORAGE_PARTITIONING));
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index c22ac4c21a..281b069c74 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -158,4 +158,11 @@ public interface ICcApplicationContext extends 
IApplicationContext {
      * @return the cluster query compilation lock
      */
     ReentrantReadWriteLock getCompilationLock();
+
+    /**
+     * Gets the data partitioing provider
+     *
+     * @return the data partitioing provider
+     */
+    IDataPartitioningProvider getDataPartitioningProvider();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
new file mode 100644
index 0000000000..e59d4e730e
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.dataflow;
+
+public interface IDataPartitioningProvider {
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java
new file mode 100644
index 0000000000..bdfa7c59b0
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum PartitioningScheme {
+    DYNAMIC("dynamic"),
+    STATIC("static");
+
+    private static final Map<String, PartitioningScheme> partitioningSchemes =
+            
Collections.unmodifiableMap(Arrays.stream(PartitioningScheme.values())
+                    .collect(Collectors.toMap(PartitioningScheme::getStr, 
Function.identity())));
+
+    private final String str;
+
+    PartitioningScheme(String str) {
+        this.str = str;
+    }
+
+    public String getStr() {
+        return str;
+    }
+
+    public static PartitioningScheme fromName(String name) {
+        PartitioningScheme partitioningScheme = 
partitioningSchemes.get(name.toLowerCase());
+        if (partitioningScheme == null) {
+            throw new IllegalArgumentException("unknonw partitioning scheme: " 
+ name);
+        }
+        return partitioningScheme;
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
index f51e474466..3f3482aca9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
@@ -65,8 +66,10 @@ public class DataSourcePartitioningProvider implements 
IDataSourcePropertiesProv
                 String dsName = ((FeedDataSource) ds).getTargetDataset();
                 Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider())
                         .findDataset(ds.getId().getDataverseName(), dsName);
-                int[][] partitionsMap1 = ((MetadataProvider) 
ctx.getMetadataProvider()).getPartitionsMap(feedDs);
-                pp = getFeedDatasetPartitioningProperty(ds, domain, 
scanVariables, partitionsMap1);
+                PartitioningProperties partitioningProperties =
+                        ((MetadataProvider) 
ctx.getMetadataProvider()).getPartitioningProperties(feedDs);
+                pp = getFeedDatasetPartitioningProperty(ds, domain, 
scanVariables,
+                        partitioningProperties.getComputeStorageMap());
                 break;
             case DataSource.Type.INTERNAL_DATASET:
             case DataSource.Type.SAMPLE:
@@ -77,8 +80,9 @@ public class DataSourcePartitioningProvider implements 
IDataSourcePropertiesProv
                 } else {
                     dataset = ((SampleDataSource) ds).getDataset();
                 }
-                int[][] partitionsMap = ((MetadataProvider) 
ctx.getMetadataProvider()).getPartitionsMap(dataset);
-                pp = getInternalDatasetPartitioningProperty(ds, domain, 
scanVariables, pvars, partitionsMap);
+                int[][] computeStorageMap = ((MetadataProvider) 
ctx.getMetadataProvider())
+                        
.getPartitioningProperties(dataset).getComputeStorageMap();
+                pp = getInternalDatasetPartitioningProperty(ds, domain, 
scanVariables, pvars, computeStorageMap);
                 propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
                 break;
             default:
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 97d5c39dc9..c648b3351f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,7 +26,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +34,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.StorageProperties;
@@ -84,6 +84,7 @@ import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.Synonym;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.utils.DataPartitioningProvider;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.FullTextUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
@@ -106,7 +107,6 @@ import 
org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPl
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -191,6 +191,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     private TxnId txnId;
     private boolean blockingOperatorDisabled = false;
 
+    private final DataPartitioningProvider dataPartitioningProvider;
+
     public static MetadataProvider create(ICcApplicationContext appCtx, 
Dataverse defaultDataverse) {
         java.util.function.Function<ICcApplicationContext, 
IMetadataProvider<?, ?>> factory =
                 ((ICCExtensionManager) 
appCtx.getExtensionManager()).getMetadataProviderFactory();
@@ -204,6 +206,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         this.storageComponentProvider = appCtx.getStorageComponentProvider();
         storageProperties = appCtx.getStorageProperties();
         functionManager = ((IFunctionExtensionManager) 
appCtx.getExtensionManager()).getFunctionManager();
+        dataPartitioningProvider = (DataPartitioningProvider) 
appCtx.getDataPartitioningProvider();
         locks = new LockList();
         config = new HashMap<>();
     }
@@ -575,8 +578,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         int numPrimaryKeys = dataset.getPrimaryKeys().size();
         RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
+        PartitioningProperties datasetPartitioningProp = 
getPartitioningProperties(dataset, theIndex.getIndexName());
         int[] primaryKeyFields = new int[numPrimaryKeys];
         for (int i = 0; i < numPrimaryKeys; i++) {
             primaryKeyFields[i] = i;
@@ -600,15 +602,16 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 dataset.getSearchCallbackFactory(storageComponentProvider, 
theIndex, IndexOperation.SEARCH,
                         primaryKeyFields, primaryKeyFieldsInSecondaryIndex, 
proceedIndexOnlyPlan);
         IStorageManager storageManager = 
getStorageComponentProvider().getStorageManager();
-        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(storageManager, spPc.first);
+        IIndexDataflowHelperFactory indexHelperFactory =
+                new IndexDataflowHelperFactory(storageManager, 
datasetPartitioningProp.getSpiltsProvider());
         BTreeSearchOperatorDescriptor btreeSearchOp;
 
-        int numPartitions = getNumPartitions(spPc.second);
-        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        int[][] partitionsMap = datasetPartitioningProp.getComputeStorageMap();
         ITuplePartitionerFactory tuplePartitionerFactory = null;
         if (partitionInputTuples) {
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-            tuplePartitionerFactory = new 
FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, numPartitions);
+            tuplePartitionerFactory = new 
FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories,
+                    datasetPartitioningProp.getNumberOfPartitions());
         }
 
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
@@ -627,7 +630,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         } else {
             btreeSearchOp = null;
         }
-        return new Pair<>(btreeSearchOp, spPc.second);
+        return new Pair<>(btreeSearchOp, 
datasetPartitioningProp.getConstraints());
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getRtreeSearchRuntime(JobSpecification jobSpec,
@@ -645,8 +648,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
         Index.ValueIndexDetails secondaryIndexDetails = 
(Index.ValueIndexDetails) secondaryIndex.getIndexDetails();
         RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+        PartitioningProperties partitioningProperties =
+                getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
         int[] primaryKeyFields = new int[numPrimaryKeys];
         for (int i = 0; i < numPrimaryKeys; i++) {
             primaryKeyFields[i] = i;
@@ -677,8 +680,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 dataset.getSearchCallbackFactory(storageComponentProvider, 
secondaryIndex, IndexOperation.SEARCH,
                         primaryKeyFields, primaryKeyFieldsInSecondaryIndex, 
isIndexOnlyPlan);
         RTreeSearchOperatorDescriptor rtreeSearchOp;
-        IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
spPc.first);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, keyFields, true, true,
                     indexDataflowHelperFactory, retainInput, retainMissing, 
nonMatchWriterFactory,
@@ -689,7 +692,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             rtreeSearchOp = null;
         }
 
-        return new Pair<>(rtreeSearchOp, spPc.second);
+        return new Pair<>(rtreeSearchOp, 
partitioningProperties.getConstraints());
     }
 
     @Override
@@ -755,26 +758,23 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             fieldPermutation[numKeys + 1] = idx;
         }
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset);
+        PartitioningProperties partitioningProperties = 
getPartitioningProperties(dataset);
         long numElementsHint = getCardinalityPerPartitionHint(dataset);
         // TODO
         // figure out the right behavior of the bulkload and then give the
         // right callback
         // (ex. what's the expected behavior when there is an error during
         // bulkload?)
-        int[][] partitionsMap = getPartitionsMap(dataset);
-        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions 
-> partitions.length).count();
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory partitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
-        IIndexDataflowHelperFactory indexHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad =
-                new LSMIndexBulkLoadOperatorDescriptor(spec, null, 
fieldPermutation,
-                        StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, true, indexHelperFactory,
-                        null, BulkLoadUsage.LOAD, dataset.getDatasetId(), 
null, partitionerFactory, partitionsMap);
-        return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
+        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
+        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
LSMIndexBulkLoadOperatorDescriptor(spec, null,
+                fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, 
false, numElementsHint, true,
+                indexHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId(), null, partitionerFactory,
+                partitioningProperties.getComputeStorageMap());
+        return new Pair<>(btreeBulkLoad, 
partitioningProperties.getConstraints());
     }
 
     @Override
@@ -971,9 +971,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 numKeyFields / 2);
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitAndConstraints(DataverseName dataverseName) {
-        return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
-                dataverseName);
+    public PartitioningProperties splitAndConstraints(DataverseName 
dataverseName) {
+        return dataPartitioningProvider.splitAndConstraints(dataverseName);
     }
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
@@ -1079,8 +1078,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                 dataset.getDatasetName(), dataset.getDatasetName());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset);
+        PartitioningProperties partitioningProperties = 
getPartitioningProperties(dataset);
 
         // prepare callback
         int[] primaryKeyFields = new int[numKeys];
@@ -1089,21 +1087,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset
                 .getModificationCallbackFactory(storageComponentProvider, 
primaryIndex, indexOp, primaryKeyFields);
-        IIndexDataflowHelperFactory idfh =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-
-        int numPartitions = getNumPartitions(splitsAndConstraint.second);
-        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        IIndexDataflowHelperFactory idfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                partitioningProperties.getSpiltsProvider());
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory partitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
 
         IOperatorDescriptor op;
         if (bulkload) {
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, 
fieldPermutation,
                     StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh, null,
-                    BulkLoadUsage.LOAD, dataset.getDatasetId(), null, 
partitionerFactory, partitionsMap);
+                    BulkLoadUsage.LOAD, dataset.getDatasetId(), null, 
partitionerFactory,
+                    partitioningProperties.getComputeStorageMap());
         } else {
             if (indexOp == IndexOperation.INSERT) {
                 ISearchOperationCallbackFactory searchCallbackFactory = dataset
@@ -1114,21 +1110,22 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                         .filter(Index::isPrimaryKeyIndex).findFirst();
                 IIndexDataflowHelperFactory pkidfh = null;
                 if (primaryKeyIndex.isPresent()) {
-                    Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primaryKeySplitsAndConstraint =
-                            getSplitProviderAndConstraints(dataset, 
primaryKeyIndex.get().getIndexName());
+                    PartitioningProperties idxPartitioningProperties =
+                            getPartitioningProperties(dataset, 
primaryKeyIndex.get().getIndexName());
                     pkidfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
-                            primaryKeySplitsAndConstraint.first);
+                            idxPartitioningProperties.getSpiltsProvider());
                 }
                 op = createLSMPrimaryInsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, 
numKeys, filterFields, partitionerFactory,
-                        partitionsMap);
+                        partitioningProperties.getComputeStorageMap());
 
             } else {
                 op = createLSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        null, true, modificationCallbackFactory, 
partitionerFactory, partitionsMap);
+                        null, true, modificationCallbackFactory, 
partitionerFactory,
+                        partitioningProperties.getComputeStorageMap());
             }
         }
-        return new Pair<>(op, splitsAndConstraint.second);
+        return new Pair<>(op, partitioningProperties.getConstraints());
     }
 
     protected LSMPrimaryInsertOperatorDescriptor 
createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
@@ -1286,36 +1283,36 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+            PartitioningProperties partitioningProperties =
+                    getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
             // prepare callback
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
-                    storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-
-            int numPartitions = getNumPartitions(splitsAndConstraint.second);
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+                    storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-            ITuplePartitionerFactory partitionerFactory =
-                    new FieldHashPartitionerFactory(pkFields, 
pkHashFunFactories, numPartitions);
+            ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                    partitioningProperties.getNumberOfPartitions());
 
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation,
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh, null,
-                        BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory, partitionerFactory, partitionsMap);
+                        BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory, partitionerFactory,
+                        partitioningProperties.getComputeStorageMap());
             } else if (indexOp == IndexOperation.UPSERT) {
                 int operationFieldIndex = 
propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh,
                         filterFactory, prevFilterFactory, 
modificationCallbackFactory, operationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, 
partitionerFactory, partitionsMap);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, 
partitionerFactory,
+                        partitioningProperties.getComputeStorageMap());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        filterFactory, false, modificationCallbackFactory, 
partitionerFactory, partitionsMap);
+                        filterFactory, false, modificationCallbackFactory, 
partitionerFactory,
+                        partitioningProperties.getComputeStorageMap());
             }
-            return new Pair<>(op, splitsAndConstraint.second);
+            return new Pair<>(op, partitioningProperties.getConstraints());
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1355,19 +1352,17 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+
+            PartitioningProperties partitioningProperties =
+                    getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
             // Prepare callback.
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
-                    storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-
-            int numPartitions = getNumPartitions(splitsAndConstraint.second);
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+                    storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-            ITuplePartitionerFactory tuplePartitionerFactory =
-                    new FieldHashPartitionerFactory(pkFields, 
pkHashFunFactories, numPartitions);
+            ITuplePartitionerFactory tuplePartitionerFactory = new 
FieldHashPartitionerFactory(pkFields,
+                    pkHashFunFactories, 
partitioningProperties.getNumberOfPartitions());
 
             IOperatorDescriptor op;
             if (indexOp == IndexOperation.UPSERT) {
@@ -1375,13 +1370,13 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 op = new 
LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, 
fieldPermutation,
                         idfh, modificationCallbackFactory, 
operationFieldIndex, BinaryIntegerInspector.FACTORY,
                         secondaryKeysPipelines.get(0), 
secondaryKeysPipelines.get(1), tuplePartitionerFactory,
-                        partitionsMap);
+                        partitioningProperties.getComputeStorageMap());
             } else {
                 op = new 
LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
                         fieldPermutation, indexOp, idfh, 
modificationCallbackFactory, secondaryKeysPipelines.get(0),
-                        tuplePartitionerFactory, partitionsMap);
+                        tuplePartitionerFactory, 
partitioningProperties.getComputeStorageMap());
             }
-            return new Pair<>(op, splitsAndConstraint.second);
+            return new Pair<>(op, partitioningProperties.getConstraints());
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1462,20 +1457,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 prevFieldPermutation[numKeys] = idx;
             }
         }
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+
+        PartitioningProperties partitioningProperties =
+                getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
 
         // prepare callback
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
                 storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
-        IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-
-        int numPartitions = getNumPartitions(splitsAndConstraint.second);
-        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory partitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
 
         IOperatorDescriptor op;
         if (bulkload) {
@@ -1483,19 +1476,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                     StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false,
                     indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId(), filterFactory,
-                    partitionerFactory, partitionsMap);
+                    partitionerFactory, 
partitioningProperties.getComputeStorageMap());
         } else if (indexOp == IndexOperation.UPSERT) {
             int operationFieldIndex = 
propagatedSchema.findVariable(operationVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                     indexDataflowHelperFactory, filterFactory, 
prevFilterFactory, modificationCallbackFactory,
                     operationFieldIndex, BinaryIntegerInspector.FACTORY, 
prevFieldPermutation, partitionerFactory,
-                    partitionsMap);
+                    partitioningProperties.getComputeStorageMap());
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, 
fieldPermutation, indexOp,
                     indexDataflowHelperFactory, filterFactory, false, 
modificationCallbackFactory, partitionerFactory,
-                    partitionsMap);
+                    partitioningProperties.getComputeStorageMap());
         }
-        return new Pair<>(op, splitsAndConstraint.second);
+        return new Pair<>(op, partitioningProperties.getConstraints());
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInvertedIndexModificationRuntime(
@@ -1583,20 +1576,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+
+            PartitioningProperties partitioningProperties =
+                    getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
 
             // prepare callback
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
                     storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory indexDataFlowFactory = new 
IndexDataflowHelperFactory(
-                    storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-
-            int numPartitions = getNumPartitions(splitsAndConstraint.second);
-            int[][] partitionsMap = getPartitionsMap(numPartitions);
+                    storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
             IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-            ITuplePartitionerFactory partitionerFactory =
-                    new FieldHashPartitionerFactory(pkFields, 
pkHashFunFactories, numPartitions);
+            ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                    partitioningProperties.getNumberOfPartitions());
 
             IOperatorDescriptor op;
             if (bulkload) {
@@ -1604,18 +1595,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
fieldPermutation,
                         StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, indexDataFlowFactory,
                         null, BulkLoadUsage.LOAD, dataset.getDatasetId(), 
filterFactory, partitionerFactory,
-                        partitionsMap);
+                        partitioningProperties.getComputeStorageMap());
             } else if (indexOp == IndexOperation.UPSERT) {
                 int upsertOperationFieldIndex = 
propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexDataFlowFactory,
                         filterFactory, prevFilterFactory, 
modificationCallbackFactory, upsertOperationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, 
partitionerFactory, partitionsMap);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, 
partitionerFactory,
+                        partitioningProperties.getComputeStorageMap());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, fieldPermutation, indexOp,
                         indexDataFlowFactory, filterFactory, false, 
modificationCallbackFactory, partitionerFactory,
-                        partitionsMap);
+                        partitioningProperties.getComputeStorageMap());
             }
-            return new Pair<>(op, splitsAndConstraint.second);
+            return new Pair<>(op, partitioningProperties.getConstraints());
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1757,8 +1749,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     FullTextUtil.fetchFilterAndCreateConfigEvaluator(this, 
secondaryIndex.getDataverseName(),
                             secondaryIndexDetails.getFullTextConfigName());
 
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+            PartitioningProperties partitioningProperties =
+                    getPartitioningProperties(dataset, 
secondaryIndex.getIndexName());
 
             // Generate Output Record format
             ISerializerDeserializer<?>[] tokenKeyPairFields = new 
ISerializerDeserializer[numTokenKeyPairFields];
@@ -1800,7 +1792,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, 
tokenKeyPairRecDesc, tokenizerFactory,
                     fullTextConfigEvaluatorFactory, docField, keyFields, 
isPartitioned, true, false,
                     MissingWriterFactory.INSTANCE);
-            return new Pair<>(tokenizerOp, splitsAndConstraint.second);
+            return new Pair<>(tokenizerOp, 
partitioningProperties.getConstraints());
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1830,15 +1822,19 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return storageComponentProvider;
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds)
-            throws AlgebricksException {
-        return getSplitProviderAndConstraints(ds, ds.getDatasetName());
+    public PartitioningProperties getPartitioningProperties(Index idx) throws 
AlgebricksException {
+        Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
+        return getPartitioningProperties(ds, idx.getIndexName());
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds,
-            String indexName) throws AlgebricksException {
-        FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    public PartitioningProperties getPartitioningProperties(Dataset ds) throws 
AlgebricksException {
+        return getPartitioningProperties(ds, ds.getDatasetName());
+    }
+
+    public PartitioningProperties getPartitioningProperties(Dataset ds, String 
indexName) throws AlgebricksException {
+        //TODO(partitioning) pass splits rather than mdTxnCtx?
+        //        FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
+        return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, 
ds, indexName);
     }
 
     public List<Pair<IFileSplitProvider, String>> 
getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
@@ -1898,34 +1894,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         validateDatabaseObjectNameImpl(objectName, sourceLoc);
     }
 
-    public int[][] getPartitionsMap(Dataset dataset) throws 
AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = 
getSplitProviderAndConstraints(dataset);
-        return getPartitionsMap(getNumPartitions(spPc.second));
-    }
-
-    public int[][] getPartitionsMap(Index idx) throws AlgebricksException {
-        Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                getSplitProviderAndConstraints(ds, idx.getIndexName());
-        return getPartitionsMap(getNumPartitions(spPc.second));
-    }
-
-    public static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
-        if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
-            return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
-        } else {
-            return ((AlgebricksAbsolutePartitionConstraint) 
constraint).getLocations().length;
-        }
-    }
-
-    public static int[][] getPartitionsMap(int numPartitions) {
-        int[][] map = new int[numPartitions][1];
-        for (int i = 0; i < numPartitions; i++) {
-            map[i] = new int[] { i };
-        }
-        return map;
-    }
-
     private void validateDatabaseObjectNameImpl(String name, SourceLocation 
sourceLoc) throws AlgebricksException {
         if (name == null || name.isEmpty()) {
             throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, 
sourceLoc, "");
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
new file mode 100644
index 0000000000..f5e96b1691
--- /dev/null
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
+import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataPartitioningProvider implements IDataPartitioningProvider {
+
+    private final ICcApplicationContext appCtx;
+    private final PartitioningScheme scheme;
+
+    public DataPartitioningProvider(ICcApplicationContext appCtx) {
+        this.appCtx = appCtx;
+        scheme = appCtx.getStorageProperties().getPartitioningScheme();
+    }
+
+    public PartitioningProperties splitAndConstraints(DataverseName 
dataverseName) {
+        if (scheme == DYNAMIC) {
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints = SplitsAndConstraintsUtil
+                    
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), 
dataverseName);
+            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
+            return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
+        } else if (scheme == STATIC) {
+            throw new NotImplementedException();
+        }
+        throw new IllegalStateException();
+    }
+
+    public PartitioningProperties 
getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+            String indexName) throws AlgebricksException {
+        if (scheme == DYNAMIC) {
+            FileSplit[] splits =
+                    SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints =
+                    
StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
+            return PartitioningProperties.of(splitsAndConstraints.first, 
splitsAndConstraints.second, partitionsMap);
+        } else if (scheme == STATIC) {
+            throw new NotImplementedException();
+        }
+        throw new IllegalStateException();
+    }
+
+    private static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
+        if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+            return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
+        } else {
+            return ((AlgebricksAbsolutePartitionConstraint) 
constraint).getLocations().length;
+        }
+    }
+
+    private static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 89dea40264..27e06eb8cb 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.column.util.ColumnSecondaryIndexSchemaUtil;
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
@@ -309,15 +310,14 @@ public class DatasetUtil {
             return 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         }
         JobSpecification specPrimary = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
-        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
-        IndexDropOperatorDescriptor primaryBtreeDrop =
-                new IndexDropOperatorDescriptor(specPrimary, 
indexHelperFactory, options, partitionsMap);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        IIndexDataflowHelperFactory indexHelperFactory =
+                new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        partitioningProperties.getSpiltsProvider());
+        IndexDropOperatorDescriptor primaryBtreeDrop = new 
IndexDropOperatorDescriptor(specPrimary, indexHelperFactory,
+                options, partitioningProperties.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary,
 primaryBtreeDrop,
-                splitsAndConstraint.second);
+                partitioningProperties.getConstraints());
         specPrimary.addRoot(primaryBtreeDrop);
         return specPrimary;
     }
@@ -334,14 +334,13 @@ public class DatasetUtil {
         itemType = (ARecordType) 
metadataProvider.findTypeForDatasetWithoutType(itemType, metaItemType, dataset);
 
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
-        FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        FileSplit[] fs = 
partitioningProperties.getSpiltsProvider().getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
             sb.append(fs[i] + " ");
         }
-        LOGGER.info("CREATING File Splits: " + sb);
+        LOGGER.info("CREATING File Splits: {}", sb);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
         // prepare a LocalResourceMetadata which will be stored in NC's local 
resource
@@ -350,12 +349,11 @@ public class DatasetUtil {
                 compactionInfo.first, compactionInfo.second);
         IndexBuilderFactory indexBuilderFactory =
                 new 
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        splitsAndConstraint.first, resourceFactory, true);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
-        IndexCreateOperatorDescriptor indexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
partitionsMap);
+                        partitioningProperties.getSpiltsProvider(), 
resourceFactory, true);
+        IndexCreateOperatorDescriptor indexCreateOp = new 
IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
+                partitioningProperties.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp,
-                splitsAndConstraint.second);
+                partitioningProperties.getConstraints());
         spec.addRoot(indexCreateOp);
         return spec;
     }
@@ -368,16 +366,16 @@ public class DatasetUtil {
             throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, 
datasetName, dataverseName);
         }
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
-        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        IIndexDataflowHelperFactory indexHelperFactory =
+                new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        partitioningProperties.getSpiltsProvider());
         LSMTreeIndexCompactOperatorDescriptor compactOp =
                 new LSMTreeIndexCompactOperatorDescriptor(spec, 
indexHelperFactory);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                splitsAndConstraint.second);
+                partitioningProperties.getConstraints());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
-                splitsAndConstraint.second);
+                partitioningProperties.getConstraints());
         spec.addRoot(compactOp);
         return spec;
     }
@@ -398,10 +396,9 @@ public class DatasetUtil {
 
     public static IOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider 
metadataProvider,
             Dataset dataset, ITupleProjectorFactory projectorFactory) throws 
AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
-        IFileSplitProvider primaryFileSplitProvider = 
primarySplitsAndConstraint.first;
-        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        IFileSplitProvider primaryFileSplitProvider = 
partitioningProperties.getSpiltsProvider();
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
partitioningProperties.getConstraints();
         // -Infinity
         int[] lowKeyFields = null;
         // +Infinity
@@ -412,11 +409,10 @@ public class DatasetUtil {
                 IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec,
                 dataset.getPrimaryRecordDescriptor(metadataProvider), 
lowKeyFields, highKeyFields, true, true,
                 indexHelperFactory, false, false, null, searchCallbackFactory, 
null, null, false, null, null, -1, false,
-                null, null, projectorFactory, null, partitionsMap);
+                null, null, projectorFactory, null, 
partitioningProperties.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
@@ -445,8 +441,7 @@ public class DatasetUtil {
 
         Index primaryIndex = 
metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
                 dataset.getDatasetName());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
 
         // prepare callback
         int[] primaryKeyFields = new int[numKeys];
@@ -462,8 +457,8 @@ public class DatasetUtil {
                 storageComponentProvider, primaryIndex, IndexOperation.UPSERT, 
primaryKeyFields);
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
                 storageComponentProvider, primaryIndex, IndexOperation.UPSERT, 
primaryKeyFields);
-        IIndexDataflowHelperFactory idfh =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+        IIndexDataflowHelperFactory idfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                partitioningProperties.getSpiltsProvider());
         LSMPrimaryUpsertOperatorDescriptor op;
         ITypeTraits[] outputTypeTraits = new 
ITypeTraits[inputRecordDesc.getFieldCount() + 1
                 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
@@ -518,18 +513,15 @@ public class DatasetUtil {
         ARecordType requestedType = getPrevRecordType(metadataProvider, 
dataset, itemType);
         ITupleProjectorFactory projectorFactory = 
IndexUtil.createUpsertTupleProjectorFactory(
                 dataset.getDatasetFormatInfo(), requestedType, itemType, 
metaItemType, numKeys);
-
-        int numPartitions = 
MetadataProvider.getNumPartitions(splitsAndConstraint.second);
-        int[][] partitionsMap = 
MetadataProvider.getPartitionsMap(numPartitions);
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(metadataProvider);
-        ITuplePartitionerFactory tuplePartitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
-
+        ITuplePartitionerFactory tuplePartitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, 
fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, 
searchCallbackFactory,
                 dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, 
filterSourceIndicator, filterItemType,
-                fieldIdx, hasSecondaries, projectorFactory, 
tuplePartitionerFactory, partitionsMap);
-        return new Pair<>(op, splitsAndConstraint.second);
+                fieldIdx, hasSecondaries, projectorFactory, 
tuplePartitionerFactory,
+                partitioningProperties.getComputeStorageMap());
+        return new Pair<>(op, partitioningProperties.getConstraints());
     }
 
     /**
@@ -591,9 +583,8 @@ public class DatasetUtil {
      */
     public static IOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
             MetadataProvider metadataProvider) throws AlgebricksException {
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
-        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
partitioningProperties.getConstraints();
 
         // Build dummy tuple containing one field with a dummy value inside.
         ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 43b1fb9986..b6f0e9604c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -19,11 +19,11 @@
 
 package org.apache.asterix.metadata.utils;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.utils.StorageConstants;
@@ -114,6 +114,7 @@ public class SampleOperationsHelper implements 
ISecondaryIndexOperationsHelper {
     private ILSMMergePolicyFactory mergePolicyFactory;
     private Map<String, String> mergePolicyProperties;
     private int groupbyNumFrames;
+    private int[][] computeStorageMap;
 
     protected SampleOperationsHelper(Dataset dataset, Index index, 
MetadataProvider metadataProvider,
             SourceLocation sourceLoc) {
@@ -134,11 +135,11 @@ public class SampleOperationsHelper implements 
ISecondaryIndexOperationsHelper {
         comparatorFactories = 
dataset.getPrimaryComparatorFactories(metadataProvider, itemType, metaType);
         groupbyNumFrames = getGroupByNumFrames(metadataProvider, sourceLoc);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
-        fileSplitProvider = secondarySplitsAndConstraint.first;
-        partitionConstraint = secondarySplitsAndConstraint.second;
-
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
+        fileSplitProvider = partitioningProperties.getSpiltsProvider();
+        partitionConstraint = partitioningProperties.getConstraints();
+        computeStorageMap = partitioningProperties.getComputeStorageMap();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
         mergePolicyFactory = compactionInfo.first;
@@ -153,9 +154,8 @@ public class SampleOperationsHelper implements 
ISecondaryIndexOperationsHelper {
                 mergePolicyFactory, mergePolicyProperties);
         IIndexBuilderFactory indexBuilderFactory = new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                 fileSplitProvider, resourceFactory, true);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
         IndexCreateOperatorDescriptor indexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
partitionsMap);
+                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
computeStorageMap);
         indexCreateOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp, partitionConstraint);
         spec.addRoot(indexCreateOp);
@@ -318,19 +318,18 @@ public class SampleOperationsHelper implements 
ISecondaryIndexOperationsHelper {
     protected LSMIndexBulkLoadOperatorDescriptor 
createTreeIndexBulkLoadOp(JobSpecification spec,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory, float fillFactor,
             long numElementHint) throws AlgebricksException {
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
         int[] pkFields = new int[dataset.getPrimaryKeys().size()];
         for (int i = 0; i < pkFields.length; i++) {
             pkFields[i] = fieldPermutation[i];
         }
-        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions 
-> partitions.length).count();
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(metadataProvider);
-        ITuplePartitionerFactory partitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
         LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
LSMIndexBulkLoadOperatorDescriptor(spec,
                 recordDesc, fieldPermutation, fillFactor, false, 
numElementHint, true, dataflowHelperFactory, null,
                 LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, 
dataset.getDatasetId(), null, partitionerFactory,
-                partitionsMap);
+                partitioningProperties.getComputeStorageMap());
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 partitionConstraint);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 02c05a912c..5d6c13c792 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,13 +19,13 @@
 
 package org.apache.asterix.metadata.utils;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -211,10 +211,10 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
         payloadSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
         metaSerde =
                 metaType == null ? null : 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
secondarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
-        secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
-        secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
+        secondaryFileSplitProvider = 
partitioningProperties.getSpiltsProvider();
+        secondaryPartitionConstraint = partitioningProperties.getConstraints();
         numPrimaryKeys = dataset.getPrimaryKeys().size();
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             filterFieldName = DatasetUtil.getFilterField(dataset);
@@ -223,10 +223,10 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
             } else {
                 numFilterFields = 0;
             }
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
-                    metadataProvider.getSplitProviderAndConstraints(dataset);
-            primaryFileSplitProvider = primarySplitsAndConstraint.first;
-            primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+            PartitioningProperties datasetPartitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+            primaryFileSplitProvider = 
datasetPartitioningProperties.getSpiltsProvider();
+            primaryPartitionConstraint = 
datasetPartitioningProperties.getConstraints();
             setPrimaryRecDescAndComparators();
         }
         setSecondaryRecDescAndComparators();
@@ -447,16 +447,15 @@ public abstract class SecondaryIndexOperationsHelper 
implements ISecondaryIndexO
             throws AlgebricksException {
         IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
-        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions 
-> partitions.length).count();
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
         IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(metadataProvider);
-        ITuplePartitionerFactory partitionerFactory =
-                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, 
numPartitions);
+        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
+                partitioningProperties.getNumberOfPartitions());
         // when an index is being created (not loaded) the filtration is 
introduced in the pipeline -> no tuple filter
-        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp =
-                new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, 
fieldPermutation, fillFactor, false,
-                        numElementsHint, false, dataflowHelperFactory, 
primaryIndexDataflowHelperFactory,
-                        BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), 
null, partitionerFactory, partitionsMap);
+        LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new 
LSMIndexBulkLoadOperatorDescriptor(spec,
+                secondaryRecDesc, fieldPermutation, fillFactor, false, 
numElementsHint, false, dataflowHelperFactory,
+                primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, 
dataset.getDatasetId(), null,
+                partitionerFactory, 
partitioningProperties.getComputeStorageMap());
         treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
treeIndexBulkLoadOp,
                 secondaryPartitionConstraint);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 4fda6e449c..498f3d4882 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -23,19 +23,17 @@ import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDes
 
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -60,9 +58,9 @@ public abstract class SecondaryTreeIndexOperationsHelper 
extends SecondaryIndexO
                 mergePolicyFactory, mergePolicyProperties);
         IIndexBuilderFactory indexBuilderFactory = new 
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
                 secondaryFileSplitProvider, resourceFactory, true);
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
-        IndexCreateOperatorDescriptor secondaryIndexCreateOp =
-                new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, 
partitionsMap);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(dataset);
+        IndexCreateOperatorDescriptor secondaryIndexCreateOp = new 
IndexCreateOperatorDescriptor(spec,
+                indexBuilderFactory, 
partitioningProperties.getComputeStorageMap());
         secondaryIndexCreateOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
secondaryIndexCreateOp,
                 secondaryPartitionConstraint);
@@ -79,17 +77,17 @@ public abstract class SecondaryTreeIndexOperationsHelper 
extends SecondaryIndexO
     static JobSpecification buildDropJobSpecImpl(Dataset dataset, Index index, 
Set<DropOption> dropOptions,
             MetadataProvider metadataProvider, SourceLocation sourceLoc) 
throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
-        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first);
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        partitioningProperties.getSpiltsProvider());
         // The index drop operation should be persistent regardless of temp 
datasets or permanent dataset.
-        int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset);
-        IndexDropOperatorDescriptor btreeDrop =
-                new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, 
dropOptions, partitionsMap);
+        IndexDropOperatorDescriptor btreeDrop = new 
IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+                dropOptions, partitioningProperties.getComputeStorageMap());
         btreeDrop.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
btreeDrop,
-                splitsAndConstraint.second);
+                partitioningProperties.getConstraints());
         spec.addRoot(btreeDrop);
         return spec;
     }
@@ -97,10 +95,11 @@ public abstract class SecondaryTreeIndexOperationsHelper 
extends SecondaryIndexO
     @Override
     public JobSpecification buildCompactJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
-        IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first);
+        PartitioningProperties partitioningProperties =
+                metadataProvider.getPartitioningProperties(dataset, 
index.getIndexName());
+        IIndexDataflowHelperFactory dataflowHelperFactory =
+                new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+                        partitioningProperties.getSpiltsProvider());
         LSMTreeIndexCompactOperatorDescriptor compactOp =
                 new LSMTreeIndexCompactOperatorDescriptor(spec, 
dataflowHelperFactory);
         compactOp.setSourceLocation(sourceLoc);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 1776b0970c..9d51420d6a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -211,7 +211,8 @@ public abstract class AbstractBTreeOperatorTest extends 
AbstractIntegrationTest
         // load secondary index
         int[] fieldPermutation = { 3, 0 };
         int[][] partitionsMap = TestUtils.getPartitionsMap(1);
-        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions 
-> partitions.length).count();
+        int numPartitions =
+                Arrays.stream(partitionsMap).map(partitions -> 
partitions.length).mapToInt(Integer::intValue).sum();
         ITuplePartitionerFactory tuplePartitionerFactory2 =
                 new FieldHashPartitionerFactory(secondaryPKFieldPermutationB, 
primaryHashFunFactories, numPartitions);
         TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad =
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index d5ad7da59c..f1dbe5f16a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -344,7 +344,8 @@ public abstract class AbstractRTreeOperatorTest extends 
AbstractIntegrationTest
         int[] fieldPermutation = { 6, 7, 8, 9, 0 };
         int[] pkFields = { 4 };
         int[][] partitionsMap = TestUtils.getPartitionsMap(1);
-        int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions 
-> partitions.length).count();
+        int numPartitions =
+                Arrays.stream(partitionsMap).map(partitions -> 
partitions.length).mapToInt(Integer::intValue).sum();
         ITuplePartitionerFactory partitionerFactory =
                 new FieldHashPartitionerFactory(pkFields, 
primaryHashFactories, numPartitions);
         TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad =

Reply via email to