This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 09f4cdc2a9 [ASTERIXDB-3144][RT] Introduce DataPartitioningProvider 09f4cdc2a9 is described below commit 09f4cdc2a9c879169808380bf64f4bda1af89390 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Wed May 3 01:30:41 2023 +0300 [ASTERIXDB-3144][RT] Introduce DataPartitioningProvider - user model changes: no - storage format changes: no - interface changes: yes Details: - Add storage partitioning scheme config (dyanmic or static) and default it to dynamic. - Introduce DataPartitioningProvider which encapsulates the logic for dataset partitioning based on the partitioning scheme. Change-Id: Ia2bbc716fb4c2e9abca06e8f8629b15bd48bc7f3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17503 Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> Tested-by: Murtadha Hubail <mhub...@apache.org> --- .../operators/physical/BTreeSearchPOperator.java | 8 +- .../operators/physical/InvertedIndexPOperator.java | 19 +- .../rules/am/IntroduceSelectAccessMethodRule.java | 7 +- .../asterix/app/cc/CcApplicationContext.java | 9 + .../org/apache/asterix/utils/DataverseUtil.java | 13 +- .../org/apache/asterix/utils/FlushDatasetUtil.java | 9 +- .../asterix/app/bootstrap/TestNodeController.java | 16 +- .../api/cluster_state_1/cluster_state_1.1.regexadm | 1 + .../cluster_state_1_full.1.regexadm | 1 + .../cluster_state_1_less.1.regexadm | 1 + .../common/cluster/PartitioningProperties.java | 56 +++++ .../asterix/common/config/StorageProperties.java | 12 +- .../common/dataflow/ICcApplicationContext.java | 7 + .../common/dataflow/IDataPartitioningProvider.java | 22 ++ .../asterix/common/utils/PartitioningScheme.java | 52 +++++ .../declared/DataSourcePartitioningProvider.java | 12 +- .../metadata/declared/MetadataProvider.java | 232 +++++++++------------ .../metadata/utils/DataPartitioningProvider.java | 93 +++++++++ .../apache/asterix/metadata/utils/DatasetUtil.java | 79 ++++--- .../metadata/utils/SampleOperationsHelper.java | 25 ++- .../utils/SecondaryIndexOperationsHelper.java | 33 ++- .../utils/SecondaryTreeIndexOperationsHelper.java | 35 ++-- .../tests/am/btree/AbstractBTreeOperatorTest.java | 3 +- .../tests/am/rtree/AbstractRTreeOperatorTest.java | 3 +- 24 files changed, 483 insertions(+), 265 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java index 8865bb2241..6c6b2aa233 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java @@ -21,6 +21,7 @@ package org.apache.asterix.algebra.operators.physical; import java.util.ArrayList; import java.util.List; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -241,10 +242,9 @@ public class BTreeSearchPOperator extends IndexSearchPOperator { propsLocal.add(new LocalOrderProperty(orderColumns)); MetadataProvider mp = (MetadataProvider) context.getMetadataProvider(); Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName()); - int[][] partitionsMap = mp.getPartitionsMap(dataset); - pv[0] = new StructuralPropertiesVector( - UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap), - propsLocal); + PartitioningProperties partitioningProperties = mp.getPartitioningProperties(dataset); + pv[0] = new StructuralPropertiesVector(UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, + domain, partitioningProperties.getComputeStorageMap()), propsLocal); return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index 5bdb2dba5a..20334bf277 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -20,6 +20,7 @@ package org.apache.asterix.algebra.operators.physical; import static org.apache.asterix.common.utils.IdentifierUtil.dataset; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.declared.DataSourceId; @@ -58,7 +59,6 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; @@ -171,8 +171,7 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { } IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, indexName); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset, indexName); // TODO: Here we assume there is only one search key field. int queryField = keyFields[0]; // Get tokenizer and search modifier factories. @@ -183,12 +182,9 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory = FullTextUtil.fetchFilterAndCreateConfigEvaluator(metadataProvider, secondaryIndex.getDataverseName(), ((Index.TextIndexDetails) secondaryIndex.getIndexDetails()).getFullTextConfigName()); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first); - - int numPartitions = MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second); - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); - + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + partitioningProperties.getSpiltsProvider()); LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput, @@ -196,7 +192,8 @@ public class InvertedIndexPOperator extends IndexSearchPOperator { dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex, IndexOperation.SEARCH, null), minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys, - propagateIndexFilter, nonFilterWriterFactory, frameLimit, partitionsMap); - return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second); + propagateIndexFilter, nonFilterWriterFactory, frameLimit, + partitioningProperties.getComputeStorageMap()); + return new Pair<>(invIndexSearchOp, partitioningProperties.getConstraints()); } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java index 24a7fb25b4..7b8567f7aa 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.TreeMap; import org.apache.asterix.algebra.operators.CommitOperator; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -361,9 +362,9 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth outputVars.add(outputVar); VariableUtilities.substituteVariables(lop, inputVar, outputVar, context); } - - int[][] partitionsMap = metadataProvider.getPartitionsMap(idx); - IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(idx); + IntersectOperator intersect = + new IntersectOperator(outputVars, inputVars, partitioningProperties.getComputeStorageMap()); intersect.setSourceLocation(lop.getSourceLocation()); for (ILogicalOperator secondarySearch : subRoots) { intersect.getInputs().add(secondarySearch.getInputs().get(0)); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java index 880880e851..a2d99a0332 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java @@ -47,6 +47,7 @@ import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.dataflow.IDataPartitioningProvider; import org.apache.asterix.common.external.IAdapterFactoryService; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.metadata.IMetadataLockUtil; @@ -54,6 +55,7 @@ import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.storage.ICompressionManager; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.common.transactions.ITxnIdFactory; +import org.apache.asterix.metadata.utils.DataPartitioningProvider; import org.apache.asterix.runtime.compression.CompressionManager; import org.apache.asterix.runtime.job.listener.NodeJobTracker; import org.apache.asterix.runtime.transaction.ResourceIdManager; @@ -112,6 +114,7 @@ public class CcApplicationContext implements ICcApplicationContext { private final IConfigValidator configValidator; private final IAdapterFactoryService adapterFactoryService; private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true); + private final IDataPartitioningProvider dataPartitioningProvider; public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, @@ -154,6 +157,7 @@ public class CcApplicationContext implements ICcApplicationContext { requestTracker = new RequestTracker(this); configValidator = configValidatorFactory.create(); this.adapterFactoryService = adapterFactoryService; + dataPartitioningProvider = new DataPartitioningProvider(this); } @Override @@ -357,4 +361,9 @@ public class CcApplicationContext implements ICcApplicationContext { public ReentrantReadWriteLock getCompilationLock() { return compilationLock; } + + @Override + public IDataPartitioningProvider getDataPartitioningProvider() { + return dataPartitioningProvider; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java index f470949a08..d5743cbdf9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java @@ -18,15 +18,13 @@ */ package org.apache.asterix.utils; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.runtime.utils.RuntimeUtils; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; public class DataverseUtil { @@ -35,10 +33,11 @@ public class DataverseUtil { public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) { JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadata.splitAndConstraints(dataverse.getDataverseName()); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second); + PartitioningProperties partitioningProperties = metadata.splitAndConstraints(dataverse.getDataverseName()); + FileRemoveOperatorDescriptor frod = + new FileRemoveOperatorDescriptor(jobSpec, partitioningProperties.getSpiltsProvider(), false); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, + partitioningProperties.getConstraints()); jobSpec.addRoot(frod); return jobSpec; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java index ff993276e1..f012a4e56a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.utils; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.transactions.TxnId; @@ -29,7 +30,6 @@ import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; @@ -38,7 +38,6 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; public class FlushDatasetUtil { private FlushDatasetUtil() { @@ -66,9 +65,9 @@ public class FlushDatasetUtil { spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, dataset.getDatasetName()); - AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, dataset.getDatasetName()); + AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints(); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource, primaryPartitionConstraint); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 3ddbfd97a3..682b636f7f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -214,7 +214,7 @@ public class TestNodeController { fieldPermutation[i] = i; } int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); + int[][] partitionsMap = getPartitionsMap(numPartitions); IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); @@ -263,7 +263,7 @@ public class TestNodeController { } int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); + int[][] partitionsMap = getPartitionsMap(numPartitions); IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); @@ -372,7 +372,7 @@ public class TestNodeController { IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); + int[][] partitionsMap = getPartitionsMap(numPartitions); IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); @@ -838,7 +838,7 @@ public class TestNodeController { IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); + int[][] partitionsMap = getPartitionsMap(numPartitions); IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); @@ -912,4 +912,12 @@ public class TestNodeController { } return new RecordDescriptor(outputSerDes, outputTypeTraits); } + + private static int[][] getPartitionsMap(int numPartitions) { + int[][] map = new int[numPartitions][1]; + for (int i = 0; i < numPartitions; i++) { + map[i] = new int[] { i }; + } + return map; + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 4e96365008..801cfb09bc 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -54,6 +54,7 @@ "storage.compression.block" : "snappy", "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, + "storage.partitioning" : "dynamic", "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, "txn\.dataset\.checkpoint\.interval" : 3600, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 415e96de01..e54bd70ba9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -54,6 +54,7 @@ "storage.compression.block" : "snappy", "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, + "storage.partitioning" : "dynamic", "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, "txn\.dataset\.checkpoint\.interval" : 3600, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index add09ca27c..c5496b26ee 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -54,6 +54,7 @@ "storage.compression.block" : "snappy", "storage.global.cleanup.timeout" : 600, "storage.lsm.bloomfilter.falsepositiverate" : 0.01, + "storage.partitioning" : "dynamic", "txn\.commitprofiler\.enabled" : false, "txn\.commitprofiler\.reportinterval" : 5, "txn\.dataset\.checkpoint\.interval" : 3600, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java new file mode 100644 index 0000000000..1580ca4ef8 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/PartitioningProperties.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.cluster; + +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class PartitioningProperties { + private final IFileSplitProvider splitsProvider; + private final AlgebricksPartitionConstraint constraints; + private final int[][] computeStorageMap; + + private PartitioningProperties(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints, + int[][] computeStorageMap) { + this.splitsProvider = splitsProvider; + this.constraints = constraints; + this.computeStorageMap = computeStorageMap; + } + + public static PartitioningProperties of(IFileSplitProvider splitsProvider, + AlgebricksPartitionConstraint constraints, int[][] computeStorageMap) { + return new PartitioningProperties(splitsProvider, constraints, computeStorageMap); + } + + public IFileSplitProvider getSpiltsProvider() { + return splitsProvider; + } + + public AlgebricksPartitionConstraint getConstraints() { + return constraints; + } + + public int[][] getComputeStorageMap() { + return computeStorageMap; + } + + public int getNumberOfPartitions() { + return splitsProvider.getFileSplits().length; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index 073da971f7..40bcfb01c7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.utils.PartitioningScheme; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; @@ -64,7 +65,8 @@ public class StorageProperties extends AbstractProperties { STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) TimeUnit.MINUTES.toSeconds(10)), STORAGE_COLUMN_MAX_TUPLE_COUNT(NONNEGATIVE_INTEGER, 15000), STORAGE_COLUMN_FREE_SPACE_TOLERANCE(DOUBLE, 0.15), - STORAGE_FORMAT(STRING, "row"); + STORAGE_FORMAT(STRING, "row"), + STORAGE_PARTITIONING(STRING, "dynamic"); private final IOptionType interpreter; private final Object defaultValue; @@ -81,6 +83,7 @@ public class StorageProperties extends AbstractProperties { case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE: case STORAGE_GLOBAL_CLEANUP: case STORAGE_GLOBAL_CLEANUP_TIMEOUT: + case STORAGE_PARTITIONING: return Section.COMMON; default: return Section.NC; @@ -139,6 +142,9 @@ public class StorageProperties extends AbstractProperties { + " 0.15 means a physical page with 15% or less empty space is tolerable)"; case STORAGE_FORMAT: return "The default storage format (either row or column)"; + case STORAGE_PARTITIONING: + return "The storage partitioning scheme (either dynamic or static). This value should not be changed" + + " after any dataset have been created"; default: throw new IllegalStateException("NYI: " + this); } @@ -287,4 +293,8 @@ public class StorageProperties extends AbstractProperties { public String getStorageFormat() { return accessor.getString(Option.STORAGE_FORMAT); } + + public PartitioningScheme getPartitioningScheme() { + return PartitioningScheme.fromName(accessor.getString(Option.STORAGE_PARTITIONING)); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index c22ac4c21a..281b069c74 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -158,4 +158,11 @@ public interface ICcApplicationContext extends IApplicationContext { * @return the cluster query compilation lock */ ReentrantReadWriteLock getCompilationLock(); + + /** + * Gets the data partitioing provider + * + * @return the data partitioing provider + */ + IDataPartitioningProvider getDataPartitioningProvider(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java new file mode 100644 index 0000000000..e59d4e730e --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.dataflow; + +public interface IDataPartitioningProvider { +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java new file mode 100644 index 0000000000..bdfa7c59b0 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/PartitioningScheme.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum PartitioningScheme { + DYNAMIC("dynamic"), + STATIC("static"); + + private static final Map<String, PartitioningScheme> partitioningSchemes = + Collections.unmodifiableMap(Arrays.stream(PartitioningScheme.values()) + .collect(Collectors.toMap(PartitioningScheme::getStr, Function.identity()))); + + private final String str; + + PartitioningScheme(String str) { + this.str = str; + } + + public String getStr() { + return str; + } + + public static PartitioningScheme fromName(String name) { + PartitioningScheme partitioningScheme = partitioningSchemes.get(name.toLowerCase()); + if (partitioningScheme == null) { + throw new IllegalArgumentException("unknonw partitioning scheme: " + name); + } + return partitioningScheme; + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java index f51e474466..3f3482aca9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourcePartitioningProvider.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.ListSet; @@ -65,8 +66,10 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv String dsName = ((FeedDataSource) ds).getTargetDataset(); Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider()) .findDataset(ds.getId().getDataverseName(), dsName); - int[][] partitionsMap1 = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(feedDs); - pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, partitionsMap1); + PartitioningProperties partitioningProperties = + ((MetadataProvider) ctx.getMetadataProvider()).getPartitioningProperties(feedDs); + pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, + partitioningProperties.getComputeStorageMap()); break; case DataSource.Type.INTERNAL_DATASET: case DataSource.Type.SAMPLE: @@ -77,8 +80,9 @@ public class DataSourcePartitioningProvider implements IDataSourcePropertiesProv } else { dataset = ((SampleDataSource) ds).getDataset(); } - int[][] partitionsMap = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(dataset); - pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, partitionsMap); + int[][] computeStorageMap = ((MetadataProvider) ctx.getMetadataProvider()) + .getPartitioningProperties(dataset).getComputeStorageMap(); + pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, computeStorageMap); propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars))); break; default: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 97d5c39dc9..c648b3351f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -26,7 +26,6 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,6 +34,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.StorageProperties; @@ -84,6 +84,7 @@ import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.Synonym; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; +import org.apache.asterix.metadata.utils.DataPartitioningProvider; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.FullTextUtil; import org.apache.asterix.metadata.utils.IndexUtil; @@ -106,7 +107,6 @@ import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPl import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -191,6 +191,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private TxnId txnId; private boolean blockingOperatorDisabled = false; + private final DataPartitioningProvider dataPartitioningProvider; + public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) { java.util.function.Function<ICcApplicationContext, IMetadataProvider<?, ?>> factory = ((ICCExtensionManager) appCtx.getExtensionManager()).getMetadataProviderFactory(); @@ -204,6 +206,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> this.storageComponentProvider = appCtx.getStorageComponentProvider(); storageProperties = appCtx.getStorageProperties(); functionManager = ((IFunctionExtensionManager) appCtx.getExtensionManager()).getFunctionManager(); + dataPartitioningProvider = (DataPartitioningProvider) appCtx.getDataPartitioningProvider(); locks = new LockList(); config = new HashMap<>(); } @@ -575,8 +578,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> int numPrimaryKeys = dataset.getPrimaryKeys().size(); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); + PartitioningProperties datasetPartitioningProp = getPartitioningProperties(dataset, theIndex.getIndexName()); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; @@ -600,15 +602,16 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> dataset.getSearchCallbackFactory(storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields, primaryKeyFieldsInSecondaryIndex, proceedIndexOnlyPlan); IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); - IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); + IIndexDataflowHelperFactory indexHelperFactory = + new IndexDataflowHelperFactory(storageManager, datasetPartitioningProp.getSpiltsProvider()); BTreeSearchOperatorDescriptor btreeSearchOp; - int numPartitions = getNumPartitions(spPc.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + int[][] partitionsMap = datasetPartitioningProp.getComputeStorageMap(); ITuplePartitionerFactory tuplePartitionerFactory = null; if (partitionInputTuples) { IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, numPartitions); + tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, + datasetPartitioningProp.getNumberOfPartitions()); } if (dataset.getDatasetType() == DatasetType.INTERNAL) { @@ -627,7 +630,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } else { btreeSearchOp = null; } - return new Pair<>(btreeSearchOp, spPc.second); + return new Pair<>(btreeSearchOp, datasetPartitioningProp.getConstraints()); } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRtreeSearchRuntime(JobSpecification jobSpec, @@ -645,8 +648,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails(); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; @@ -677,8 +680,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> dataset.getSearchCallbackFactory(storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields, primaryKeyFieldsInSecondaryIndex, isIndexOnlyPlan); RTreeSearchOperatorDescriptor rtreeSearchOp; - IIndexDataflowHelperFactory indexDataflowHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); if (dataset.getDatasetType() == DatasetType.INTERNAL) { rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, indexDataflowHelperFactory, retainInput, retainMissing, nonMatchWriterFactory, @@ -689,7 +692,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> rtreeSearchOp = null; } - return new Pair<>(rtreeSearchOp, spPc.second); + return new Pair<>(rtreeSearchOp, partitioningProperties.getConstraints()); } @Override @@ -755,26 +758,23 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> fieldPermutation[numKeys + 1] = idx; } - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset); + PartitioningProperties partitioningProperties = getPartitioningProperties(dataset); long numElementsHint = getCardinalityPerPartitionHint(dataset); // TODO // figure out the right behavior of the bulkload and then give the // right callback // (ex. what's the expected behavior when there is an error during // bulkload?) - int[][] partitionsMap = getPartitionsMap(dataset); - int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count(); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); - IIndexDataflowHelperFactory indexHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = - new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, - StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory, - null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap); - return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); + IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); + LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null, + fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, + indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, + partitioningProperties.getComputeStorageMap()); + return new Pair<>(btreeBulkLoad, partitioningProperties.getConstraints()); } @Override @@ -971,9 +971,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> numKeyFields / 2); } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(DataverseName dataverseName) { - return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), - dataverseName); + public PartitioningProperties splitAndConstraints(DataverseName dataverseName) { + return dataPartitioningProvider.splitAndConstraints(dataverseName); } public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) @@ -1079,8 +1078,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset); + PartitioningProperties partitioningProperties = getPartitioningProperties(dataset); // prepare callback int[] primaryKeyFields = new int[numKeys]; @@ -1089,21 +1087,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } IModificationOperationCallbackFactory modificationCallbackFactory = dataset .getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); - IIndexDataflowHelperFactory idfh = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - - int numPartitions = getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), + partitioningProperties.getSpiltsProvider()); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null, - BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, partitionsMap); + BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } else { if (indexOp == IndexOperation.INSERT) { ISearchOperationCallbackFactory searchCallbackFactory = dataset @@ -1114,21 +1110,22 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> .filter(Index::isPrimaryKeyIndex).findFirst(); IIndexDataflowHelperFactory pkidfh = null; if (primaryKeyIndex.isPresent()) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint = - getSplitProviderAndConstraints(dataset, primaryKeyIndex.get().getIndexName()); + PartitioningProperties idxPartitioningProperties = + getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName()); pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), - primaryKeySplitsAndConstraint.first); + idxPartitioningProperties.getSpiltsProvider()); } op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh, modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } else { op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, - null, true, modificationCallbackFactory, partitionerFactory, partitionsMap); + null, true, modificationCallbackFactory, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } } - return new Pair<>(op, splitsAndConstraint.second); + return new Pair<>(op, partitioningProperties.getConstraints()); } protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec, @@ -1286,36 +1283,36 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); // prepare callback IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( - storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - - int numPartitions = getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null, - BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, partitionsMap); + BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } else if (indexOp == IndexOperation.UPSERT) { int operationFieldIndex = propagatedSchema.findVariable(operationVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex, - BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap); + BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, - filterFactory, false, modificationCallbackFactory, partitionerFactory, partitionsMap); + filterFactory, false, modificationCallbackFactory, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } - return new Pair<>(op, splitsAndConstraint.second); + return new Pair<>(op, partitioningProperties.getConstraints()); } catch (Exception e) { throw new AlgebricksException(e); } @@ -1355,19 +1352,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); // Prepare callback. IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( - storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - - int numPartitions = getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory tuplePartitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(pkFields, + pkHashFunFactories, partitioningProperties.getNumberOfPartitions()); IOperatorDescriptor op; if (indexOp == IndexOperation.UPSERT) { @@ -1375,13 +1370,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY, secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1), tuplePartitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } else { op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, modificationCallbackFactory, secondaryKeysPipelines.get(0), - tuplePartitionerFactory, partitionsMap); + tuplePartitionerFactory, partitioningProperties.getComputeStorageMap()); } - return new Pair<>(op, splitsAndConstraint.second); + return new Pair<>(op, partitioningProperties.getConstraints()); } catch (Exception e) { throw new AlgebricksException(e); } @@ -1462,20 +1457,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> prevFieldPermutation[numKeys] = idx; } } - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); // prepare callback IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); - IIndexDataflowHelperFactory indexDataflowHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - - int numPartitions = getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); IOperatorDescriptor op; if (bulkload) { @@ -1483,19 +1476,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, - partitionerFactory, partitionsMap); + partitionerFactory, partitioningProperties.getComputeStorageMap()); } else if (indexOp == IndexOperation.UPSERT) { int operationFieldIndex = propagatedSchema.findVariable(operationVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } - return new Pair<>(op, splitsAndConstraint.second); + return new Pair<>(op, partitioningProperties.getConstraints()); } private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexModificationRuntime( @@ -1583,20 +1576,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); // prepare callback IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory( - storageComponentProvider.getStorageManager(), splitsAndConstraint.first); - - int numPartitions = getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = getPartitionsMap(numPartitions); + storageComponentProvider.getStorageManager(), partitioningProperties.getSpiltsProvider()); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); IOperatorDescriptor op; if (bulkload) { @@ -1604,18 +1595,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } else if (indexOp == IndexOperation.UPSERT) { int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar); op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex, - BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, partitionsMap); + BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory, + partitioningProperties.getComputeStorageMap()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, indexDataFlowFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); } - return new Pair<>(op, splitsAndConstraint.second); + return new Pair<>(op, partitioningProperties.getConstraints()); } catch (Exception e) { throw new AlgebricksException(e); } @@ -1757,8 +1749,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> FullTextUtil.fetchFilterAndCreateConfigEvaluator(this, secondaryIndex.getDataverseName(), secondaryIndexDetails.getFullTextConfigName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + PartitioningProperties partitioningProperties = + getPartitioningProperties(dataset, secondaryIndex.getIndexName()); // Generate Output Record format ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields]; @@ -1800,7 +1792,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, fullTextConfigEvaluatorFactory, docField, keyFields, isPartitioned, true, false, MissingWriterFactory.INSTANCE); - return new Pair<>(tokenizerOp, splitsAndConstraint.second); + return new Pair<>(tokenizerOp, partitioningProperties.getConstraints()); } catch (Exception e) { throw new AlgebricksException(e); } @@ -1830,15 +1822,19 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return storageComponentProvider; } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds) - throws AlgebricksException { - return getSplitProviderAndConstraints(ds, ds.getDatasetName()); + public PartitioningProperties getPartitioningProperties(Index idx) throws AlgebricksException { + Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName()); + return getPartitioningProperties(ds, idx.getIndexName()); } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds, - String indexName) throws AlgebricksException { - FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName); - return StoragePathUtil.splitProviderAndPartitionConstraints(splits); + public PartitioningProperties getPartitioningProperties(Dataset ds) throws AlgebricksException { + return getPartitioningProperties(ds, ds.getDatasetName()); + } + + public PartitioningProperties getPartitioningProperties(Dataset ds, String indexName) throws AlgebricksException { + //TODO(partitioning) pass splits rather than mdTxnCtx? + // FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName); + return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName); } public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException { @@ -1898,34 +1894,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> validateDatabaseObjectNameImpl(objectName, sourceLoc); } - public int[][] getPartitionsMap(Dataset dataset) throws AlgebricksException { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset); - return getPartitionsMap(getNumPartitions(spPc.second)); - } - - public int[][] getPartitionsMap(Index idx) throws AlgebricksException { - Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - getSplitProviderAndConstraints(ds, idx.getIndexName()); - return getPartitionsMap(getNumPartitions(spPc.second)); - } - - public static int getNumPartitions(AlgebricksPartitionConstraint constraint) { - if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) { - return ((AlgebricksCountPartitionConstraint) constraint).getCount(); - } else { - return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length; - } - } - - public static int[][] getPartitionsMap(int numPartitions) { - int[][] map = new int[numPartitions][1]; - for (int i = 0; i < numPartitions; i++) { - map[i] = new int[] { i }; - } - return map; - } - private void validateDatabaseObjectNameImpl(String name, SourceLocation sourceLoc) throws AlgebricksException { if (name == null || name.isEmpty()) { throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, ""); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java new file mode 100644 index 0000000000..f5e96b1691 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC; +import static org.apache.asterix.common.utils.PartitioningScheme.STATIC; + +import org.apache.asterix.common.cluster.PartitioningProperties; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.dataflow.IDataPartitioningProvider; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class DataPartitioningProvider implements IDataPartitioningProvider { + + private final ICcApplicationContext appCtx; + private final PartitioningScheme scheme; + + public DataPartitioningProvider(ICcApplicationContext appCtx) { + this.appCtx = appCtx; + scheme = appCtx.getStorageProperties().getPartitioningScheme(); + } + + public PartitioningProperties splitAndConstraints(DataverseName dataverseName) { + if (scheme == DYNAMIC) { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil + .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName); + int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); + return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); + } else if (scheme == STATIC) { + throw new NotImplementedException(); + } + throw new IllegalStateException(); + } + + public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds, + String indexName) throws AlgebricksException { + if (scheme == DYNAMIC) { + FileSplit[] splits = + SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = + StoragePathUtil.splitProviderAndPartitionConstraints(splits); + int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second)); + return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap); + } else if (scheme == STATIC) { + throw new NotImplementedException(); + } + throw new IllegalStateException(); + } + + private static int getNumPartitions(AlgebricksPartitionConstraint constraint) { + if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) { + return ((AlgebricksCountPartitionConstraint) constraint).getCount(); + } else { + return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length; + } + } + + private static int[][] getPartitionsMap(int numPartitions) { + int[][] map = new int[numPartitions][1]; + for (int i = 0; i < numPartitions; i++) { + map[i] = new int[] { i }; + } + return map; + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 89dea40264..27e06eb8cb 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -31,6 +31,7 @@ import java.util.UUID; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.RecordBuilder; import org.apache.asterix.column.util.ColumnSecondaryIndexSchemaUtil; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; @@ -309,15 +310,14 @@ public class DatasetUtil { return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); } JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); - IndexDropOperatorDescriptor primaryBtreeDrop = - new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options, partitionsMap); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + IIndexDataflowHelperFactory indexHelperFactory = + new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + partitioningProperties.getSpiltsProvider()); + IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, + options, partitioningProperties.getComputeStorageMap()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop, - splitsAndConstraint.second); + partitioningProperties.getConstraints()); specPrimary.addRoot(primaryBtreeDrop); return specPrimary; } @@ -334,14 +334,13 @@ public class DatasetUtil { itemType = (ARecordType) metadataProvider.findTypeForDatasetWithoutType(itemType, metaItemType, dataset); JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - FileSplit[] fs = splitsAndConstraint.first.getFileSplits(); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + FileSplit[] fs = partitioningProperties.getSpiltsProvider().getFileSplits(); StringBuilder sb = new StringBuilder(); for (int i = 0; i < fs.length; i++) { sb.append(fs[i] + " "); } - LOGGER.info("CREATING File Splits: " + sb); + LOGGER.info("CREATING File Splits: {}", sb); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); // prepare a LocalResourceMetadata which will be stored in NC's local resource @@ -350,12 +349,11 @@ public class DatasetUtil { compactionInfo.first, compactionInfo.second); IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), - splitsAndConstraint.first, resourceFactory, true); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); - IndexCreateOperatorDescriptor indexCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap); + partitioningProperties.getSpiltsProvider(), resourceFactory, true); + IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, + partitioningProperties.getComputeStorageMap()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, - splitsAndConstraint.second); + partitioningProperties.getConstraints()); spec.addRoot(indexCreateOp); return spec; } @@ -368,16 +366,16 @@ public class DatasetUtil { throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName); } JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + IIndexDataflowHelperFactory indexHelperFactory = + new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + partitioningProperties.getSpiltsProvider()); LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, indexHelperFactory); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - splitsAndConstraint.second); + partitioningProperties.getConstraints()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - splitsAndConstraint.second); + partitioningProperties.getConstraints()); spec.addRoot(compactOp); return spec; } @@ -398,10 +396,9 @@ public class DatasetUtil { public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, ITupleProjectorFactory projectorFactory) throws AlgebricksException { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first; - AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + IFileSplitProvider primaryFileSplitProvider = partitioningProperties.getSpiltsProvider(); + AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints(); // -Infinity int[] lowKeyFields = null; // +Infinity @@ -412,11 +409,10 @@ public class DatasetUtil { IRecoveryManager.ResourceType.LSM_BTREE); IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true, indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false, - null, null, projectorFactory, null, partitionsMap); + null, null, projectorFactory, null, partitioningProperties.getComputeStorageMap()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, primaryPartitionConstraint); return primarySearchOp; @@ -445,8 +441,7 @@ public class DatasetUtil { Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); // prepare callback int[] primaryKeyFields = new int[numKeys]; @@ -462,8 +457,8 @@ public class DatasetUtil { storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields); ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields); - IIndexDataflowHelperFactory idfh = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); + IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), + partitioningProperties.getSpiltsProvider()); LSMPrimaryUpsertOperatorDescriptor op; ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; @@ -518,18 +513,15 @@ public class DatasetUtil { ARecordType requestedType = getPrevRecordType(metadataProvider, dataset, itemType); ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory( dataset.getDatasetFormatInfo(), requestedType, itemType, metaItemType, numKeys); - - int numPartitions = MetadataProvider.getNumPartitions(splitsAndConstraint.second); - int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider); - ITuplePartitionerFactory tuplePartitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); - + ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh, missingWriterFactory, modificationCallbackFactory, searchCallbackFactory, dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType, - fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory, partitionsMap); - return new Pair<>(op, splitsAndConstraint.second); + fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory, + partitioningProperties.getComputeStorageMap()); + return new Pair<>(op, partitioningProperties.getConstraints()); } /** @@ -591,9 +583,8 @@ public class DatasetUtil { */ public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset, MetadataProvider metadataProvider) throws AlgebricksException { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints(); // Build dummy tuple containing one field with a dummy value inside. ArrayTupleBuilder tb = new ArrayTupleBuilder(1); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java index 43b1fb9986..b6f0e9604c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java @@ -19,11 +19,11 @@ package org.apache.asterix.metadata.utils; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.utils.StorageConstants; @@ -114,6 +114,7 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { private ILSMMergePolicyFactory mergePolicyFactory; private Map<String, String> mergePolicyProperties; private int groupbyNumFrames; + private int[][] computeStorageMap; protected SampleOperationsHelper(Dataset dataset, Index index, MetadataProvider metadataProvider, SourceLocation sourceLoc) { @@ -134,11 +135,11 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { comparatorFactories = dataset.getPrimaryComparatorFactories(metadataProvider, itemType, metaType); groupbyNumFrames = getGroupByNumFrames(metadataProvider, sourceLoc); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); - fileSplitProvider = secondarySplitsAndConstraint.first; - partitionConstraint = secondarySplitsAndConstraint.second; - + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); + fileSplitProvider = partitioningProperties.getSpiltsProvider(); + partitionConstraint = partitioningProperties.getConstraints(); + computeStorageMap = partitioningProperties.getComputeStorageMap(); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); mergePolicyFactory = compactionInfo.first; @@ -153,9 +154,8 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { mergePolicyFactory, mergePolicyProperties); IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(), fileSplitProvider, resourceFactory, true); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); IndexCreateOperatorDescriptor indexCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap); + new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, computeStorageMap); indexCreateOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitionConstraint); spec.addRoot(indexCreateOp); @@ -318,19 +318,18 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor, long numElementHint) throws AlgebricksException { - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); int[] pkFields = new int[dataset.getPrimaryKeys().size()]; for (int i = 0; i < pkFields.length; i++) { pkFields[i] = fieldPermutation[i]; } - int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count(); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null, LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory, - partitionsMap); + partitioningProperties.getComputeStorageMap()); treeIndexBulkLoadOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, partitionConstraint); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 02c05a912c..5d6c13c792 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -19,13 +19,13 @@ package org.apache.asterix.metadata.utils; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.OptimizationConfUtil; import org.apache.asterix.common.exceptions.CompilationException; @@ -211,10 +211,10 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); metaSerde = metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); - secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - secondaryPartitionConstraint = secondarySplitsAndConstraint.second; + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); + secondaryFileSplitProvider = partitioningProperties.getSpiltsProvider(); + secondaryPartitionConstraint = partitioningProperties.getConstraints(); numPrimaryKeys = dataset.getPrimaryKeys().size(); if (dataset.getDatasetType() == DatasetType.INTERNAL) { filterFieldName = DatasetUtil.getFilterField(dataset); @@ -223,10 +223,10 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO } else { numFilterFields = 0; } - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset); - primaryFileSplitProvider = primarySplitsAndConstraint.first; - primaryPartitionConstraint = primarySplitsAndConstraint.second; + + PartitioningProperties datasetPartitioningProperties = metadataProvider.getPartitioningProperties(dataset); + primaryFileSplitProvider = datasetPartitioningProperties.getSpiltsProvider(); + primaryPartitionConstraint = datasetPartitioningProperties.getConstraints(); setPrimaryRecDescAndComparators(); } setSecondaryRecDescAndComparators(); @@ -447,16 +447,15 @@ public abstract class SecondaryIndexOperationsHelper implements ISecondaryIndexO throws AlgebricksException { IndexDataflowHelperFactory primaryIndexDataflowHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); - int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count(); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider); - ITuplePartitionerFactory partitionerFactory = - new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions); + ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, + partitioningProperties.getNumberOfPartitions()); // when an index is being created (not loaded) the filtration is introduced in the pipeline -> no tuple filter - LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = - new LSMIndexBulkLoadOperatorDescriptor(spec, secondaryRecDesc, fieldPermutation, fillFactor, false, - numElementsHint, false, dataflowHelperFactory, primaryIndexDataflowHelperFactory, - BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null, partitionerFactory, partitionsMap); + LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec, + secondaryRecDesc, fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, + primaryIndexDataflowHelperFactory, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), null, + partitionerFactory, partitioningProperties.getComputeStorageMap()); treeIndexBulkLoadOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, secondaryPartitionConstraint); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java index 4fda6e449c..498f3d4882 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java @@ -23,19 +23,17 @@ import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDes import java.util.Set; +import org.apache.asterix.common.cluster.PartitioningProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.runtime.utils.RuntimeUtils; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory; import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; @@ -60,9 +58,9 @@ public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexO mergePolicyFactory, mergePolicyProperties); IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(), secondaryFileSplitProvider, resourceFactory, true); - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); - IndexCreateOperatorDescriptor secondaryIndexCreateOp = - new IndexCreateOperatorDescriptor(spec, indexBuilderFactory, partitionsMap); + PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); + IndexCreateOperatorDescriptor secondaryIndexCreateOp = new IndexCreateOperatorDescriptor(spec, + indexBuilderFactory, partitioningProperties.getComputeStorageMap()); secondaryIndexCreateOp.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, secondaryPartitionConstraint); @@ -79,17 +77,17 @@ public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexO static JobSpecification buildDropJobSpecImpl(Dataset dataset, Index index, Set<DropOption> dropOptions, MetadataProvider metadataProvider, SourceLocation sourceLoc) throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + partitioningProperties.getSpiltsProvider()); // The index drop operation should be persistent regardless of temp datasets or permanent dataset. - int[][] partitionsMap = metadataProvider.getPartitionsMap(dataset); - IndexDropOperatorDescriptor btreeDrop = - new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, dropOptions, partitionsMap); + IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, + dropOptions, partitioningProperties.getComputeStorageMap()); btreeDrop.setSourceLocation(sourceLoc); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, - splitsAndConstraint.second); + partitioningProperties.getConstraints()); spec.addRoot(btreeDrop); return spec; } @@ -97,10 +95,11 @@ public abstract class SecondaryTreeIndexOperationsHelper extends SecondaryIndexO @Override public JobSpecification buildCompactJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); + PartitioningProperties partitioningProperties = + metadataProvider.getPartitioningProperties(dataset, index.getIndexName()); + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), + partitioningProperties.getSpiltsProvider()); LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, dataflowHelperFactory); compactOp.setSourceLocation(sourceLoc); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java index 1776b0970c..9d51420d6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java @@ -211,7 +211,8 @@ public abstract class AbstractBTreeOperatorTest extends AbstractIntegrationTest // load secondary index int[] fieldPermutation = { 3, 0 }; int[][] partitionsMap = TestUtils.getPartitionsMap(1); - int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count(); + int numPartitions = + Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum(); ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory(secondaryPKFieldPermutationB, primaryHashFunFactories, numPartitions); TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java index d5ad7da59c..f1dbe5f16a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java @@ -344,7 +344,8 @@ public abstract class AbstractRTreeOperatorTest extends AbstractIntegrationTest int[] fieldPermutation = { 6, 7, 8, 9, 0 }; int[] pkFields = { 4 }; int[][] partitionsMap = TestUtils.getPartitionsMap(1); - int numPartitions = (int) Arrays.stream(partitionsMap).map(partitions -> partitions.length).count(); + int numPartitions = + Arrays.stream(partitionsMap).map(partitions -> partitions.length).mapToInt(Integer::intValue).sum(); ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, primaryHashFactories, numPartitions); TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad =