http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java index 9b9ab4a..d4f2129 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java @@ -29,6 +29,7 @@ import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.commons.io.FileUtils; import org.kohsuke.args4j.Argument; import org.kohsuke.args4j.CmdLineParser; @@ -58,7 +59,7 @@ public class AsterixCLI { for (String queryFile : options.args) { Reader in = new FileReader(queryFile); AsterixJavaClient ajc = new AsterixJavaClient(integrationUtil.getHyracksClientConnection(), in, - compilationProvider, new DefaultStatementExecutorFactory()); + compilationProvider, new DefaultStatementExecutorFactory(), new StorageComponentProvider()); try { ajc.compile(true, false, false, false, false, true, false); } finally {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java index f05b67d..f44aeb9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixClientDriver.java @@ -25,6 +25,7 @@ import org.apache.asterix.api.java.AsterixJavaClient; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.kohsuke.args4j.CmdLineParser; @@ -64,7 +65,7 @@ public class AsterixClientDriver { ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); FileReader reader = new FileReader(filename); AsterixJavaClient q = new AsterixJavaClient(hcc, reader, compilationProvider, - new DefaultStatementExecutorFactory()); + new DefaultStatementExecutorFactory(), new StorageComponentProvider()); q.compile(optimize, true, true, true, onlyPhysical, createBinaryRuntime, createBinaryRuntime); return q; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java deleted file mode 100644 index 1c05a62..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.file; - -import java.io.File; -import java.rmi.RemoteException; -import java.util.Map; -import java.util.logging.Logger; - -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.transactions.IResourceFactory; -import org.apache.asterix.formats.base.IDataFormat; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.Dataverse; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; - -public class DatasetOperations { - - private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName()); - - public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt, - MetadataProvider metadataProvider) - throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException { - - String dataverseName = null; - if (datasetDropStmt.getDataverseName() != null) { - dataverseName = datasetDropStmt.getDataverseName(); - } else if (metadataProvider.getDefaultDataverse() != null) { - dataverseName = metadataProvider.getDefaultDataverse().getDataverseName(); - } - - String datasetName = datasetDropStmt.getDatasetName(); - String datasetPath = dataverseName + File.separator + datasetName; - - LOGGER.info("DROP DATASETPATH: " + datasetPath); - - Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName); - } - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return JobSpecificationUtils.createJobSpecification(); - } - boolean temp = dataset.getDatasetDetails().isTemp(); - - Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), - dataverseName); - IDataFormat format; - try { - format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); - } catch (Exception e) { - throw new AsterixException(e); - } - - ARecordType itemType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), - dataset.getItemTypeName()); - - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, format.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification(); - - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName, - temp); - StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - - IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp), LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop, - splitsAndConstraint.second); - - specPrimary.addRoot(primaryBtreeDrop); - - return specPrimary; - } - - public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName, - MetadataProvider metadata) throws AsterixException, AlgebricksException { - String dataverseName = dataverse.getDataverseName(); - IDataFormat format; - try { - format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); - } catch (Exception e) { - throw new AsterixException(e); - } - Dataset dataset = metadata.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName); - } - boolean temp = dataset.getDatasetDetails().isTemp(); - ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(), - dataset.getItemTypeName()); - // get meta item type - ARecordType metaItemType = null; - if (dataset.hasMetaPart()) { - metaItemType = (ARecordType) metadata.findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); - } - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, - itemType, metaItemType, format.getBinaryComparatorFactoryProvider()); - ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); - int[] bloomFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset); - - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, format.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp); - FileSplit[] fs = splitsAndConstraint.first.getFileSplits(); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < fs.length; i++) { - sb.append(fs[i] + " "); - } - LOGGER.info("CREATING File Splits: " + sb.toString()); - - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadata.getMetadataTxnContext()); - StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - IResourceFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(typeTraits, - comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first, - compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields); - ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.LSMBTreeResource); - - TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp), - localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil - .getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, - splitsAndConstraint.second); - spec.addRoot(indexCreateOp); - return spec; - } - - public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName, - MetadataProvider metadata) throws AsterixException, AlgebricksException { - String dataverseName = dataverse.getDataverseName(); - IDataFormat format; - try { - format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); - } catch (Exception e) { - throw new AsterixException(e); - } - Dataset dataset = metadata.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName); - } - boolean temp = dataset.getDatasetDetails().isTemp(); - ARecordType itemType = (ARecordType) metadata.findType(dataset.getItemTypeDataverseName(), - dataset.getItemTypeName()); - ARecordType metaItemType = DatasetUtils.getMetaType(metadata, dataset); - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, - itemType, metaItemType, format.getBinaryComparatorFactoryProvider()); - ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType, metaItemType); - int[] blooFilterKeyFields = DatasetUtils.createBloomFilterKeyFields(dataset); - ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, - itemType, format.getBinaryComparatorFactoryProvider()); - int[] filterFields = DatasetUtils.createFilterFields(dataset); - int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp); - StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadata.getMetadataTxnContext()); - LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - btreeFields, filterFields, !temp), - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - splitsAndConstraint.second); - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - splitsAndConstraint.second); - spec.addRoot(compactOp); - return spec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java deleted file mode 100644 index 4f9af13..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataverse; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; - -public class DataverseOperations { - public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) { - JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata - .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName()); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second); - jobSpec.addRoot(frod); - return jobSpec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java deleted file mode 100644 index d1c0ae3..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.config.OptimizationConfUtil; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -public class IndexOperations { - - private static final PhysicalOptimizationConfig physicalOptimizationConfig = - OptimizationConfUtil.getPhysicalOptimizationConfig(); - - public static JobSpecification buildSecondaryIndexCreationJobSpec(CompiledCreateIndexStatement createIndexStmt, - ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType, - MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { - SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper - .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(), - createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(), - createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(), - createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider, - physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType); - return secondaryIndexHelper.buildCreationJobSpec(); - } - - public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt, - ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType, - MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { - SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper - .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(), - createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(), - createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(), - createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider, - physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType); - return secondaryIndexHelper.buildLoadingJobSpec(); - } - - public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt, - ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType, - MetadataProvider metadataProvider, List<ExternalFile> files) - throws AsterixException, AlgebricksException { - SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper - .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(), - createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(), - createIndexStmt.getKeyFields(), createIndexStmt.getKeyFieldTypes(), - createIndexStmt.isEnforced(), createIndexStmt.getGramLength(), metadataProvider, - physicalOptimizationConfig, recType, metaType, keySourceIndicators, enforcedType); - secondaryIndexHelper.setExternalFiles(files); - return secondaryIndexHelper.buildLoadingJobSpec(); - } - - public static JobSpecification buildDropSecondaryIndexJobSpec(CompiledIndexDropStatement indexDropStmt, - MetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException { - String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName() - : indexDropStmt.getDataverseName(); - String datasetName = indexDropStmt.getDatasetName(); - String indexName = indexDropStmt.getIndexName(); - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - boolean temp = dataset.getDatasetDetails().isTemp(); - - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp); - StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = - DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - - // The index drop operation should be persistent regardless of temp datasets or permanent dataset. - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp), - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, - splitsAndConstraint.second); - spec.addRoot(btreeDrop); - - return spec; - } - - public static JobSpecification buildSecondaryIndexCompactJobSpec(CompiledIndexCompactStatement indexCompactStmt, - ARecordType recType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType, - MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { - SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper - .createIndexOperationsHelper(indexCompactStmt.getIndexType(), indexCompactStmt.getDataverseName(), - indexCompactStmt.getDatasetName(), indexCompactStmt.getIndexName(), - indexCompactStmt.getKeyFields(), indexCompactStmt.getKeyTypes(), indexCompactStmt.isEnforced(), - indexCompactStmt.getGramLength(), metadataProvider, physicalOptimizationConfig, recType, - metaType, keySourceIndicators, enforcedType); - return secondaryIndexHelper.buildCompactJobSpec(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java deleted file mode 100644 index 68ab182..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import org.apache.asterix.common.config.CompilerProperties; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.hyracks.api.job.JobSpecification; - -public class JobSpecificationUtils { - public static JobSpecification createJobSpecification() { - CompilerProperties compilerProperties = AppContextInfo.INSTANCE.getCompilerProperties(); - int frameSize = compilerProperties.getFrameSize(); - JobSpecification spec = new JobSpecification(frameSize); - return spec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java deleted file mode 100644 index 3a9a72b..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import java.util.List; - -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; -import org.apache.hyracks.algebricks.data.ITypeTraitProvider; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; - -public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelper { - - protected SecondaryBTreeOperationsHelper(PhysicalOptimizationConfig physOptConf, - IPropertiesProvider propertiesProvider) { - super(physOptConf, propertiesProvider); - } - - @Override - public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - ILocalResourceFactoryProvider localResourceFactoryProvider; - IIndexDataflowHelperFactory indexDataflowHelperFactory; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - LSMBTreeLocalResourceMetadataFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory( - secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, false, dataset.getDatasetId(), - mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, filterCmpFactories, - secondaryBTreeFields, secondaryFilterFields); - localResourceFactoryProvider = - new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource); - indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories, - secondaryBTreeFields, secondaryFilterFields, !dataset.getDatasetDetails().isTemp()); - } else { - // External dataset local resource and dataflow helper - int[] buddyBreeFields = new int[] { numSecondaryKeys }; - ExternalBTreeWithBuddyLocalResourceMetadataFactory localResourceMetadata = - new ExternalBTreeWithBuddyLocalResourceMetadataFactory( - dataset.getDatasetId(), secondaryComparatorFactories, secondaryTypeTraits, mergePolicyFactory, - mergePolicyFactoryProperties, buddyBreeFields); - localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, - LocalResource.ExternalBTreeWithBuddyResource); - indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), buddyBreeFields, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - } - TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, - secondaryBloomFilterKeyFields, indexDataflowHelperFactory, localResourceFactoryProvider, - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(secondaryIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numSecondaryKeys); - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - /* - * In case of external data, this method is used to build loading jobs for both initial load on index creation - * and transaction load on dataset referesh - */ - - // Create external indexing scan operator - ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec); - - // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { - sourceOp = createCastOp(spec, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = - createExternalAssignOp(spec, numSecondaryKeys, secondaryRecDesc); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, secondaryRecDesc); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - // Create secondary BTree bulk load op. - AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp; - ExternalBTreeWithBuddyDataflowHelperFactory dataflowHelperFactory = - new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - IOperatorDescriptor root; - if (externalFiles != null) { - // Transaction load - secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - root = secondaryBulkLoadOp; - } else { - // Initial load - secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, - new RecordDescriptor[] { secondaryRecDesc }); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - root = metaOp; - } - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.addRoot(root); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } else { - // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); - - // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); - - // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { - sourceOp = createCastOp(spec, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = - createAssignOp(spec, sourceOp, numSecondaryKeys, secondaryRecDesc); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, secondaryRecDesc); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - // Create secondary BTree bulk load op. - TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, - filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, !temp), - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, - new RecordDescriptor[] { secondaryRecDesc }); - // Connect the operators. - spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - spec.addRoot(metaOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - } - - @Override - protected int getNumSecondaryKeys() { - return numSecondaryKeys; - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - LSMTreeIndexCompactOperatorDescriptor compactOp; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, - filterCmpFactories, secondaryBTreeFields, secondaryFilterFields, !temp), - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - } else { - // External dataset - compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, - new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numSecondaryKeys }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true), - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - } - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - @SuppressWarnings("rawtypes") - protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields, - List<IAType> secondaryKeyTypes, int gramLength, MetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; - secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys]; - secondaryBloomFilterKeyFields = new int[numSecondaryKeys]; - ISerializerDeserializer[] secondaryRecFields = - new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields]; - ISerializerDeserializer[] enforcedRecFields = - new ISerializerDeserializer[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields]; - ITypeTraits[] enforcedTypeTraits = - new ITypeTraits[1 + numPrimaryKeys + (dataset.hasMetaPart() ? 1 : 0) + numFilterFields]; - secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); - ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider(); - IBinaryComparatorFactoryProvider comparatorFactoryProvider = - metadataProvider.getFormat().getBinaryComparatorFactoryProvider(); - // Record column is 0 for external datasets, numPrimaryKeys for internal ones - int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0; - for (int i = 0; i < numSecondaryKeys; i++) { - ARecordType sourceType; - int sourceColumn; - if (keySourceIndicators == null || keySourceIndicators.get(i) == 0) { - sourceType = itemType; - sourceColumn = recordColumn; - } else { - sourceType = metaType; - sourceColumn = recordColumn + 1; - } - secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory( - isEnforcingKeyTypes ? enforcedItemType : sourceType, secondaryKeyFields.get(i), sourceColumn); - Pair<IAType, Boolean> keyTypePair = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyFields.get(i), sourceType); - IAType keyType = keyTypePair.first; - anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; - ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType); - secondaryRecFields[i] = keySerde; - secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true); - secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType); - secondaryBloomFilterKeyFields[i] = i; - } - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - // Add serializers and comparators for primary index fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i]; - enforcedRecFields[i] = primaryRecDesc.getFields()[i]; - secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i]; - enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i]; - secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i]; - } - } else { - // Add serializers and comparators for RID fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numSecondaryKeys + i] = IndexingConstants.getSerializerDeserializer(i); - enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i); - secondaryTypeTraits[numSecondaryKeys + i] = IndexingConstants.getTypeTraits(i); - enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i); - secondaryComparatorFactories[numSecondaryKeys + i] = IndexingConstants.getComparatorFactory(i); - } - } - enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); - enforcedTypeTraits[numPrimaryKeys] = typeTraitProvider.getTypeTrait(itemType); - if (dataset.hasMetaPart()) { - enforcedRecFields[numPrimaryKeys + 1] = serdeProvider.getSerializerDeserializer(metaType); - enforcedTypeTraits[numPrimaryKeys + 1] = typeTraitProvider.getTypeTrait(metaType); - } - - if (numFilterFields > 0) { - secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat() - .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys); - Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); - IAType type = keyTypePair.first; - ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); - secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; - enforcedRecFields[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = serde; - enforcedTypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)] = - typeTraitProvider.getTypeTrait(type); - } - secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits); - enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); - - } - - protected int[] createFieldPermutationForBulkLoadOp(int numSecondaryKeyFields) { - int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; - for (int i = 0; i < fieldPermutation.length; i++) { - fieldPermutation[i] = i; - } - return fieldPermutation; - } -}
