http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 7238db9..1beaed0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -30,24 +30,17 @@ import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.context.ITransactionSubsystemProvider; -import org.apache.asterix.common.context.TransactionSubsystemProvider; import org.apache.asterix.common.dataflow.IApplicationContextInfo; -import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.api.IDataSourceAdapter; @@ -55,9 +48,9 @@ import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor; import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; import org.apache.asterix.external.provider.AdapterFactoryProvider; import org.apache.asterix.external.util.ExternalDataConstants; @@ -79,29 +72,26 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity; import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMInvertedIndexUpsertOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMTreeUpsertOperatorDescriptor; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.runtime.util.ClusterStateManager; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; @@ -148,6 +138,7 @@ import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeseri import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; @@ -155,22 +146,18 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory; import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; public class MetadataProvider implements IMetadataProvider<DataSourceId, String> { + private final IStorageComponentProvider storaegComponentProvider; + private final ITransactionSubsystemProvider txnSubsystemProvider; + private final IMetadataPageManagerFactory metadataPageManagerFactory; + private final IPrimitiveValueProviderFactory primitiveValueProviderFactory; private final StorageProperties storageProperties; private final ILibraryManager libraryManager; private final Dataverse defaultDataverse; @@ -188,10 +175,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private boolean isTemporaryDatasetWriteJob = true; private boolean blockingOperatorDisabled = false; - public MetadataProvider(Dataverse defaultDataverse) { + public MetadataProvider(Dataverse defaultDataverse, IStorageComponentProvider componentProvider) { this.defaultDataverse = defaultDataverse; - this.storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - this.libraryManager = AppContextInfo.INSTANCE.getLibraryManager(); + this.storaegComponentProvider = componentProvider; + storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); + libraryManager = AppContextInfo.INSTANCE.getLibraryManager(); + txnSubsystemProvider = componentProvider.getTransactionSubsystemProvider(); + metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory(); + primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory(); } public String getPropertyValue(String propertyName) { @@ -408,8 +399,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan( JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc) throws AlgebricksException { - ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc, - adapterFactory); + ExternalScanOperatorDescriptor dataScanner = + new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory); AlgebricksPartitionConstraint constraint; try { constraint = adapterFactory.getPartitionConstraint(); @@ -437,8 +428,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime( JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor policyAccessor) throws Exception { Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput; - factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, - libraryManager); + factoryOutput = + FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, mdTxnCtx, libraryManager); ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, primaryFeed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME); IAdapterFactory adapterFactory = factoryOutput.first; @@ -454,6 +445,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second); break; + default: + break; } AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint(); @@ -461,10 +454,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec, - List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, - JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, - int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive, - Object implConfig, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { + IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, + boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, + boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, + int[] maxFilterFieldIndexes) throws AlgebricksException { boolean isSecondary = true; int numSecondaryKeys = 0; try { @@ -474,43 +467,38 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) { isSecondary = !indexName.equals(primaryIndex.getIndexName()); } - int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); + Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), indexName) : primaryIndex; + int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); int[] bloomFilterKeyFields; ITypeTraits[] typeTraits; IBinaryComparatorFactory[] comparatorFactories; - ARecordType itemType = (ARecordType) this.findType(dataset.getItemTypeDataverseName(), - dataset.getItemTypeName()); + ARecordType itemType = + (ARecordType) this.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); ARecordType metaType = null; List<Integer> primaryKeyIndicators = null; if (dataset.hasMetaPart()) { - metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); + metaType = + (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); primaryKeyIndicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); } - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = null; - int[] btreeFields = null; - + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType); + int[] filterFields; + int[] btreeFields; if (isSecondary) { - Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), - dataset.getDatasetName(), indexName); - numSecondaryKeys = secondaryIndex.getKeyFieldNames().size(); + numSecondaryKeys = theIndex.getKeyFieldNames().size(); bloomFilterKeyFields = new int[numSecondaryKeys]; for (int i = 0; i < numSecondaryKeys; i++) { bloomFilterKeyFields[i] = i; } Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits = - getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex( - secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(), - DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(), - dataset.hasMetaPart(), primaryKeyIndicators, - secondaryIndex.getKeyFieldSourceIndicators(), - metaType); + getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(theIndex.getKeyFieldNames(), + theIndex.getKeyFieldTypes(), DatasetUtil.getPartitioningKeys(dataset), itemType, + dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators, + theIndex.getKeyFieldSourceIndicators(), metaType); comparatorFactories = comparatorFactoriesAndTypeTraits.first; typeTraits = comparatorFactoriesAndTypeTraits.second; if (filterTypeTraits != null) { @@ -528,77 +516,58 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> bloomFilterKeyFields[i] = i; } // get meta item type - ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); - typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); - comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, + ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); + typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); + comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); - filterFields = DatasetUtils.createFilterFields(dataset); - btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); } IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; - spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), dataset.getDatasetName(), - indexName, temp); + spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), indexName, + temp); ISearchOperationCallbackFactory searchCallbackFactory; if (isSecondary) { searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory(); } else { - JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(); int datasetId = dataset.getDatasetId(); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; } - ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); - /** * Due to the read-committed isolation level, * we may acquire very short duration lock(i.e., instant lock) for readers. */ searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields, - txnSubsystemProvider, ResourceType.LSM_BTREE); + : new PrimaryIndexInstantSearchOperationCallbackFactory( + ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(), + datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); } - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); RuntimeComponentsProvider rtcProvider = RuntimeComponentsProvider.RUNTIME_PROVIDER; BTreeSearchOperatorDescriptor btreeSearchOp; if (dataset.getDatasetType() == DatasetType.INTERNAL) { btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, + appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, + typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()) - : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits, - filterCmpFactories, btreeFields, filterFields, !temp), + dataset.getIndexDataflowHelperFactory(this, theIndex, itemType, metaType, compactionInfo.first, + compactionInfo.second), retainInput, retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, - minFilterFieldIndexes, maxFilterFieldIndexes, LSMIndexUtil - .getMetadataPageManagerFactory()); + minFilterFieldIndexes, maxFilterFieldIndexes, metadataPageManagerFactory); } else { - // External dataset <- use the btree with buddy btree-> - // Be Careful of Key Start Index ? - int[] buddyBreeFields = new int[] { numSecondaryKeys }; - ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = - new ExternalBTreeWithBuddyDataflowHelperFactory( - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields, - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, + theIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider, rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput, - retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, LSMIndexUtil - .getMetadataPageManagerFactory()); + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, + metadataPageManagerFactory); } return new Pair<>(btreeSearchOp, spPc.second); } catch (MetadataException me) { @@ -611,8 +580,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException { try { - ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); - int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); + ARecordType recType = + (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); boolean temp = dataset.getDatasetDetails().isTemp(); Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), @@ -629,19 +599,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> "Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " + "There can be only one field as a key for the R-tree index."); } - Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), - secondaryKeyFields.get(0), recType); + Pair<IAType, Boolean> keyTypePair = + Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType); IAType keyType = keyTypePair.first; if (keyType == null) { throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); } int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag()); - boolean isPointMBR = keyType.getTypeTag() == ATypeTag.POINT || keyType.getTypeTag() == ATypeTag.POINT3D; int numNestedSecondaryKeyFields = numDimensions * 2; IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE; + valueProviderFactories[i] = primitiveValueProviderFactory; } RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); @@ -655,27 +624,24 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), - dataset.getDatasetName(), indexName, temp); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints( + dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp); ARecordType metaType = null; if (dataset.hasMetaPart()) { - metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); + metaType = + (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); } - IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories( + IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories( dataset, recType, metaType, context.getBinaryComparatorFactoryProvider()); int[] btreeFields = new int[primaryComparatorFactories.length]; for (int i = 0; i < btreeFields.length; i++) { btreeFields[i] = i + numNestedSecondaryKeyFields; } - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - recType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = null; - int[] rtreeFields = null; + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recType); + int[] filterFields; + int[] rtreeFields; if (filterTypeTraits != null) { filterFields = new int[1]; filterFields[0] = numNestedSecondaryKeyFields + numPrimaryKeys; @@ -684,46 +650,26 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> rtreeFields[i] = i; } } - - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag()); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); - ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new SecondaryIndexSearchOperationCallbackFactory(); - + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + ISearchOperationCallbackFactory searchCallbackFactory = + temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory(); RTreeSearchOperatorDescriptor rtreeSearchOp; + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, + secondaryIndex, recType, metaType, compactionInfo.first, compactionInfo.second); if (dataset.getDatasetType() == DatasetType.INTERNAL) { - IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories( - comparatorFactories, primaryComparatorFactories); - IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory( - valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories, - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first, - compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length), - rtreeFields, filterTypeTraits, filterCmpFactories, filterFields, !temp, isPointMBR); rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - spPc.first, typeTraits, comparatorFactories, keyFields, idff, retainInput, retainMissing, - context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, - maxFilterFieldIndexes, LSMIndexUtil.getMetadataPageManagerFactory()); + appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, + typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, + maxFilterFieldIndexes, metadataPageManagerFactory); } else { - // External Dataset - ExternalRTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory( - valueProviderFactories, RTreePolicyType.RTREE, - IndexingConstants.getBuddyBtreeComparatorFactories(), compactionInfo.first, - compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length), - getStorageProperties().getBloomFilterFalsePositiveRate(), - new int[] { numNestedSecondaryKeyFields }, - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp, isPointMBR); // Create the operator rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, - retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, LSMIndexUtil - .getMetadataPageManagerFactory()); + appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), spPc.first, + typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput, + retainMissing, context.getMissingWriterFactory(), searchCallbackFactory, + metadataPageManagerFactory); } return new Pair<>(rtreeSearchOp, spPc.second); @@ -741,8 +687,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> File outFile = new File(fs.getPath()); String nodeId = fs.getNodeName(); - SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, - getWriterFactory(), inputDesc); + SinkWriterRuntimeFactory runtime = + new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc); AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId }); return new Pair<>(runtime, apc); } @@ -776,7 +722,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); int numKeys = keys.size(); - int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; // move key fields to front int[] fieldPermutation = new int[numKeys + 1 + numFilterFields]; @@ -797,56 +743,40 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> try { boolean temp = dataset.getDatasetDetails().isTemp(); isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; - Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); String indexName = primaryIndex.getIndexName(); - - ARecordType metaType = null; - if (dataset.hasMetaPart()) { - metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); - } - + ARecordType metaType = dataset.hasMetaPart() + ? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()) + : null; String itemTypeName = dataset.getItemTypeName(); ARecordType itemType = (ARecordType) MetadataManager.INSTANCE .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); - ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, null); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, null); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, metaType, context.getBinaryComparatorFactoryProvider()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), - datasetName, indexName, temp); + getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName, + temp); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); - long numElementsHint = getCardinalityPerPartitionHint(dataset); - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - // TODO // figure out the right behavior of the bulkload and then give the // right callback // (ex. what's the expected behavior when there is an error during // bulkload?) - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); - TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, - filterCmpFactories, btreeFields, filterFields, !temp), LSMIndexUtil - .getMetadataPageManagerFactory()); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = + new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, + numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType, + metaType, compactionInfo.first, compactionInfo.second), + metadataPageManagerFactory); return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); } catch (MetadataException me) { throw new AlgebricksException(me); @@ -957,10 +887,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> * @param dataset * @return Number of elements that will be used to create a bloom filter per * dataset per partition - * @throws MetadataException * @throws AlgebricksException */ - public long getCardinalityPerPartitionHint(Dataset dataset) throws MetadataException, AlgebricksException { + public long getCardinalityPerPartitionHint(Dataset dataset) throws AlgebricksException { String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME); long numElementsHint; if (numElementsHintString == null) { @@ -969,34 +898,32 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> numElementsHint = Long.parseLong(numElementsHintString); } int numPartitions = 0; - List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()) - .getNodeNames(); + List<String> nodeGroup = + MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); for (String nd : nodeGroup) { numPartitions += ClusterStateManager.INSTANCE.getNodePartitionsCount(nd); } - numElementsHint = numElementsHint / numPartitions; - return numElementsHint; + return numElementsHint / numPartitions; } protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName, - Map<String, String> configuration, ARecordType itemType, boolean isPKAutoGenerated, - List<List<String>> primaryKeys, ARecordType metaType) throws AlgebricksException { + Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AlgebricksException { try { configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName()); IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(libraryManager, adapterName, configuration, itemType, metaType); // check to see if dataset is indexed - Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), - dataset.getDatasetName(), - dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); + Index filesIndex = + MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), + dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); if (filesIndex != null && filesIndex.getPendingOp() == 0) { // get files List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); Iterator<ExternalFile> iterator = files.iterator(); while (iterator.hasNext()) { - if (iterator.next().getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) { + if (iterator.next().getPendingOp() != ExternalFilePendingOp.NO_OP) { iterator.remove(); } } @@ -1018,27 +945,27 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> numKeyFields / 2); } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset( - String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(String dataverseName, + String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } - public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse( - String dataverse) { - return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForDataverse(dataverse); + public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) { + return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse); } public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException { - return SplitsAndConstraintsUtil.splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); + return SplitsAndConstraintsUtil.getDatasetSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) throws MetadataException { DatasourceAdapter adapter; // search in default namespace (built-in adapter) - adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); + adapter = + MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); // search in dataverse (user-defined adapter) if (adapter == null) { @@ -1052,61 +979,61 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex( - String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException { - return SplitsAndConstraintsUtil.splitProviderAndPartitionConstraintsForFilesIndex(mdTxnCtx, dataverseName, - datasetName, targetIdxName, create); + String dataverseName, String datasetName, String targetIdxName, boolean create) + throws AlgebricksException { + return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName, + targetIdxName, create); } public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( - JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput, - IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema, - JobGenContext context, MetadataProvider metadataProvider, boolean retainMissing) - throws AlgebricksException { + JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput, + IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context, + MetadataProvider metadataProvider, boolean retainMissing) throws AlgebricksException { try { // Get data type - IAType itemType; - itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), - dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); - - // Create the adapter factory <- right now there is only one. if there are more in the future, we can create - // a map-> + ARecordType itemType = + (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); + ARecordType metaType = null; + if (dataset.hasMetaPart()) { + metaType = + (ARecordType) MetadataManager.INSTANCE + .getDatatype(metadataProvider.getMetadataTxnContext(), + dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()) + .getDatatype(); + } ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory(libraryManager, - datasetDetails.getProperties(), (ARecordType) itemType, ridIndexes, retainInput, retainMissing, - context.getMissingWriterFactory()); + LookupAdapterFactory<?> adapterFactory = + AdapterFactoryProvider.getLookupAdapterFactory(libraryManager, datasetDetails.getProperties(), + itemType, ridIndexes, retainInput, retainMissing, context.getMissingWriterFactory()); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo; try { - compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); } catch (MetadataException e) { throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e); } boolean temp = datasetDetails.isTemp(); + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), fileIndexName); // Create the file index data flow helper - ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp); - + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, + fileIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); // Create the out record descriptor, appContext and fileSplitProvider for the files index RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), - dataset.getDatasetName(), - dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false); - ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new SecondaryIndexSearchOperationCallbackFactory(); + dataset.getDatasetName(), fileIndexName, false); + ISearchOperationCallbackFactory searchOpCallbackFactory = + temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory(); // Create the operator ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(), - appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(), - metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory, - retainMissing, context.getMissingWriterFactory(), LSMIndexUtil - .getMetadataPageManagerFactory()); + appContext.getStorageManager(), spPc.first, searchOpCallbackFactory, retainMissing, + context.getMissingWriterFactory(), metadataPageManagerFactory); return new Pair<>(op, spPc.second); } catch (Exception e) { throw new AlgebricksException(e); @@ -1129,7 +1056,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; int numKeys = primaryKeys.size(); - int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size(); // Move key fields to front. [keys, record, filters] int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields]; @@ -1156,7 +1083,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> fieldPermutation[i++] = idx; } } - try { Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); @@ -1166,15 +1092,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> String itemTypeDataverseName = dataset.getItemTypeDataverseName(); ARecordType itemType = (ARecordType) MetadataManager.INSTANCE .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype(); - ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); - ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); + ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); + ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName, - indexName, temp); - + getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName, + temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); int datasetId = dataset.getDatasetId(); @@ -1183,13 +1108,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> primaryKeyFields[i] = i; } - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - - TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE) @@ -1199,18 +1117,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory( jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType, + metaItemType, compactionInfo.first, compactionInfo.second); LSMTreeUpsertOperatorDescriptor op; - ITypeTraits[] outputTypeTraits = new ITypeTraits[recordDesc.getFieldCount() - + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; + ITypeTraits[] outputTypeTraits = + new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; ISerializerDeserializer[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields]; @@ -1220,23 +1134,23 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> f++; // add the previous meta second if (dataset.hasMetaPart()) { - outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer( - metaItemType); + outputSerDes[f] = + FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType); outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType); f++; } // add the previous filter third int fieldIdx = -1; if (numFilterFields > 0) { - String filterField = DatasetUtils.getFilterField(dataset).get(0); + String filterField = DatasetUtil.getFilterField(dataset).get(0); for (i = 0; i < itemType.getFieldNames().length; i++) { if (itemType.getFieldNames()[i].equals(filterField)) { break; } } fieldIdx = i; - outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(itemType - .getFieldTypes()[fieldIdx]); + outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider() + .getTypeTrait(itemType.getFieldTypes()[fieldIdx]); outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider() .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); f++; @@ -1247,11 +1161,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits); - op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, - idfh, null, true, indexName, context.getMissingWriterFactory(), modificationCallbackFactory, - searchCallbackFactory, null, LSMIndexUtil.getMetadataPageManagerFactory()); + op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, appContext.getStorageManager(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName, + context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null, + metadataPageManagerFactory); op.setType(itemType); op.setFilterIndex(fieldIdx); return new Pair<>(op, splitsAndConstraint.second); @@ -1271,8 +1185,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ISerializerDeserializer<?> payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType); RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); - ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, scannerDesc, - adapterFactory); + ExternalScanOperatorDescriptor dataScanner = + new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory); AlgebricksPartitionConstraint constraint; try { @@ -1298,12 +1212,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> int i = 0; for (; i < sidxKeyFieldCount; ++i) { - Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), - sidxKeyFieldNames.get(i), - (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); + Pair<IAType, Boolean> keyPairType = + Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i), + (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); IAType keyType = keyPairType.first; - comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, - true); + comparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); } @@ -1325,8 +1239,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } catch (AsterixException e) { throw new AlgebricksException(e); } - comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, - true); + comparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); } @@ -1340,13 +1254,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException { String datasetName = dataSource.getId().getDatasourceName(); - Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), - datasetName); + Dataset dataset = + MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), datasetName); boolean temp = dataset.getDatasetDetails().isTemp(); isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; int numKeys = keys.size(); - int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; // Move key fields to front. int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())]; @@ -1375,60 +1289,51 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> dataset.getDatasetName(), dataset.getDatasetName()); String indexName = primaryIndex.getIndexName(); ARecordType itemType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype(); - ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); - ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); + .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()) + .getDatatype(); + ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); + ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName, - indexName, temp); + getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName, + temp); // prepare callback - JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); int datasetId = dataset.getDatasetId(); int[] primaryKeyFields = new int[numKeys]; for (i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; } - - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - - TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, + ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory( + ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE) - : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields, - txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart()); - - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp); + : new PrimaryIndexModificationOperationCallbackFactory( + ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId, + primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, + dataset.hasMetaPart()); + + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType, + metaItemType, compactionInfo.first, compactionInfo.second); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(), + op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, LSMIndexUtil - .getMetadataPageManagerFactory()); + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, + metadataPageManagerFactory); } else { - op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, - fieldPermutation, indexOp, idfh, null, true, indexName, null, modificationCallbackFactory, - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true, + indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (MetadataException me) { @@ -1496,7 +1401,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp; int numKeys = primaryKeys.size() + secondaryKeys.size(); - int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; // generate field permutations int[] fieldPermutation = new int[numKeys + numFilterFields]; @@ -1546,22 +1451,23 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } String itemTypeName = dataset.getItemTypeName(); - IAType itemType; + ARecordType itemType; try { - itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName) - .getDatatype(); + itemType = (ARecordType) MetadataManager.INSTANCE + .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); validateRecordType(itemType); - ARecordType recType = (ARecordType) itemType; + ARecordType metaType = dataset.hasMetaPart() + ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx, + dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype() + : null; // Index parameters. Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), indexName); - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - recType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = null; - int[] btreeFields = null; + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType); + int[] filterFields; + int[] btreeFields; if (filterTypeTraits != null) { filterFields = new int[1]; filterFields[0] = numKeys; @@ -1577,73 +1483,60 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys]; for (i = 0; i < secondaryKeys.size(); ++i) { Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), - secondaryKeyNames.get(i), recType); + secondaryKeyNames.get(i), itemType); IAType keyType = keyPairType.first; - comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, - true); + comparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); } - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); for (List<String> partitioningKey : partitioningKeys) { - IAType keyType = recType.getSubFieldType(partitioningKey); - comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, - true); + IAType keyType = itemType.getSubFieldType(partitioningKey); + comparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); ++i; } IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp); + getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); int datasetId = dataset.getDatasetId(); - TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE) + modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, + ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, - dataset.hasMetaPart()); + modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, + ResourceType.LSM_BTREE, dataset.hasMetaPart()); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils - .getMergePolicyFactory(dataset, mdTxnCtx); - IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); + IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType, + metaType, compactionInfo.first, compactionInfo.second); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); - op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManagerInterface(), + op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, LSMIndexUtil - .getMetadataPageManagerFactory()); + GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, + metadataPageManagerFactory); } else if (indexOp == IndexOperation.UPSERT) { - op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, - fieldPermutation, idfh, filterFactory, false, indexName, null, modificationCallbackFactory, - NoOpOperationCallbackFactory.INSTANCE, prevFieldPermutation, LSMIndexUtil - .getMetadataPageManagerFactory()); + op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false, + indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + prevFieldPermutation, metadataPageManagerFactory); } else { - op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, - appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, - fieldPermutation, indexOp, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId), - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, - filterCmpFactories, btreeFields, filterFields, !temp), - filterFactory, false, indexName, null, modificationCallbackFactory, - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); + op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), + appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, + comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory, + false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); } catch (Exception e) { @@ -1672,11 +1565,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> dataset.getDatasetName(), indexName); List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames(); List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); - Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), - secondaryKeyExprs.get(0), recType); + Pair<IAType, Boolean> keyPairType = + Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType); IAType spatialType = keyPairType.first; - boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT - || spatialType.getTypeTag() == ATypeTag.POINT3D; int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); int numSecondaryKeys = dimension * 2; int numPrimaryKeys = primaryKeys.size(); @@ -1684,7 +1575,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ITypeTraits[] typeTraits = new ITypeTraits[numKeys]; IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys]; - int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1; + int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; int[] fieldPermutation = new int[numKeys + numFilterFields]; int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; int i = 0; @@ -1735,33 +1626,31 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys]; for (i = 0; i < numSecondaryKeys; i++) { - comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(nestedKeyType, true); + comparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); - valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE; + valueProviderFactories[i] = primitiveValueProviderFactory; } - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); for (List<String> partitioningKey : partitioningKeys) { IAType keyType = recType.getSubFieldType(partitioningKey); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); ++i; } - ARecordType metaItemType = DatasetUtils.getMetaType(this, dataset); - IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories( + ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset); + IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories( dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider()); IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp); + getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp); int[] btreeFields = new int[primaryComparatorFactories.length]; for (int k = 0; k < btreeFields.length; k++) { btreeFields[k] = k + numSecondaryKeys; } - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - recType, context.getBinaryComparatorFactoryProvider()); - int[] filterFields = null; - int[] rtreeFields = null; + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, recType); + int[] filterFields; + int[] rtreeFields;
<TRUNCATED>
