http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java new file mode 100644 index 0000000..80792b5 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.io.DataOutput; +import java.io.File; +import java.rmi.RemoteException; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.asterix.builders.IARecordBuilder; +import org.apache.asterix.builders.RecordBuilder; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.transactions.IResourceFactory; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.formats.base.IDataFormat; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.CompactionPolicy; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Dataverse; +import org.apache.asterix.metadata.entities.ExternalDatasetDetails; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.om.base.AMutableString; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; +import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; +import org.apache.hyracks.storage.common.file.LocalResource; + +public class DatasetUtil { + private static final Logger LOGGER = Logger.getLogger(DatasetUtil.class.getName()); + /* + * Dataset related operations + */ + public static final byte OP_READ = 0x00; + public static final byte OP_INSERT = 0x01; + public static final byte OP_DELETE = 0x02; + public static final byte OP_UPSERT = 0x03; + + private DatasetUtil() { + } + + public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, + ARecordType itemType, ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) + throws AlgebricksException { + List<List<String>> partitioningKeys = getPartitioningKeys(dataset); + IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()]; + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + // Get comparators for RID fields. + for (int i = 0; i < partitioningKeys.size(); i++) { + try { + bcfs[i] = IndexingConstants.getComparatorFactory(i); + } catch (AsterixException e) { + throw new AlgebricksException(e); + } + } + } else { + InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails(); + for (int i = 0; i < partitioningKeys.size(); i++) { + IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1) + ? metaItemType.getSubFieldType(partitioningKeys.get(i)) + : itemType.getSubFieldType(partitioningKeys.get(i)); + bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true); + } + } + return bcfs; + } + + public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + throw new AlgebricksException("not implemented"); + } + List<List<String>> partitioningKeys = getPartitioningKeys(dataset); + int[] bloomFilterKeyFields = new int[partitioningKeys.size()]; + for (int i = 0; i < partitioningKeys.size(); ++i) { + bloomFilterKeyFields[i] = i; + } + return bloomFilterKeyFields; + } + + public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType, + IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + throw new AlgebricksException("not implemented"); + } + List<List<String>> partitioningKeys = getPartitioningKeys(dataset); + IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()]; + for (int i = 0; i < partitioningKeys.size(); i++) { + IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i)); + bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType); + } + return bhffs; + } + + public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType) + throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + throw new AlgebricksException("not implemented"); + } + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + int numKeys = partitioningKeys.size(); + ITypeTraits[] typeTraits; + if (metaItemType != null) { + typeTraits = new ITypeTraits[numKeys + 2]; + List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); + typeTraits[numKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType); + for (int i = 0; i < numKeys; i++) { + IAType keyType; + if (indicator.get(i) == 0) { + keyType = itemType.getSubFieldType(partitioningKeys.get(i)); + } else { + keyType = metaItemType.getSubFieldType(partitioningKeys.get(i)); + } + typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); + } + } else { + typeTraits = new ITypeTraits[numKeys + 1]; + for (int i = 0; i < numKeys; i++) { + IAType keyType; + keyType = itemType.getSubFieldType(partitioningKeys.get(i)); + typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); + } + } + typeTraits[numKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + return typeTraits; + } + + public static List<List<String>> getPartitioningKeys(Dataset dataset) { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return IndexingConstants + .getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); + } + return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey(); + } + + public static List<String> getFilterField(Dataset dataset) { + return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField(); + } + + public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset, + ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) + throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return null; + } + List<String> filterField = getFilterField(dataset); + if (filterField == null) { + return null; + } + IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1]; + IAType type = itemType.getSubFieldType(filterField); + bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true); + return bcfs; + } + + public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType) + throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return null; + } + List<String> filterField = getFilterField(dataset); + if (filterField == null) { + return null; + } + ITypeTraits[] typeTraits = new ITypeTraits[1]; + IAType type = itemType.getSubFieldType(filterField); + typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type); + return typeTraits; + } + + public static int[] createFilterFields(Dataset dataset) throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return null; + } + + List<String> filterField = getFilterField(dataset); + if (filterField == null) { + return null; + } + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + int numKeys = partitioningKeys.size(); + + int[] filterFields = new int[1]; + filterFields[0] = numKeys + 1; + return filterFields; + } + + public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException { + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return null; + } + + List<String> filterField = getFilterField(dataset); + if (filterField == null) { + return null; + } + + List<List<String>> partitioningKeys = getPartitioningKeys(dataset); + int valueFields = dataset.hasMetaPart() ? 2 : 1; + int[] btreeFields = new int[partitioningKeys.size() + valueFields]; + for (int i = 0; i < btreeFields.length; ++i) { + btreeFields[i] = i; + } + return btreeFields; + } + + public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) { + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + for (int i = 0; i < partitioningKeys.size(); i++) { + if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) { + return i; + } + } + return -1; + } + + public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset, + MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException { + String policyName = dataset.getCompactionPolicy(); + CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, + MetadataConstants.METADATA_DATAVERSE_NAME, policyName); + String compactionPolicyFactoryClassName = compactionPolicy.getClassName(); + ILSMMergePolicyFactory mergePolicyFactory; + try { + mergePolicyFactory = + (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); + if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { + ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId()); + } + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new AlgebricksException(e); + } + Map<String, String> properties = dataset.getCompactionPolicyProperties(); + return new Pair<>(mergePolicyFactory, properties); + } + + @SuppressWarnings("unchecked") + public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType) + throws HyracksDataException { + IARecordBuilder propertyRecordBuilder = new RecordBuilder(); + ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); + propertyRecordBuilder.reset(recordType); + AMutableString aString = new AMutableString(""); + ISerializerDeserializer<AString> stringSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); + + // write field 0 + fieldValue.reset(); + aString.setValue(name); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + propertyRecordBuilder.addField(0, fieldValue); + + // write field 1 + fieldValue.reset(); + aString.setValue(value); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + propertyRecordBuilder.addField(1, fieldValue); + + propertyRecordBuilder.write(out, true); + } + + public static ARecordType getMetaType(MetadataProvider metadataProvider, Dataset dataset) + throws AlgebricksException { + if (dataset.hasMetaPart()) { + return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), + dataset.getMetaItemTypeName()); + } + return null; + } + + public static JobSpecification createDropDatasetJobSpec(Dataset dataset, Index primaryIndex, + MetadataProvider metadataProvider) + throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { + String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName(); + LOGGER.info("DROP DATASETPATH: " + datasetPath); + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return RuntimeUtils.createJobSpecification(); + } + boolean temp = dataset.getDatasetDetails().isTemp(); + ARecordType itemType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset); + JobSpecification specPrimary = RuntimeUtils.createJobSpecification(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), + dataset.getDatasetName(), temp); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( + metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + IndexDropOperatorDescriptor primaryBtreeDrop = + new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(), + storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, + indexDataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop, + splitsAndConstraint.second); + specPrimary.addRoot(primaryBtreeDrop); + return specPrimary; + } + + public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider metadataProvider, Dataset dataset) + throws AlgebricksException { + String indexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), + dataset.getDatasetName(), indexName, true); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + fileIndex, null, null, compactionInfo.first, compactionInfo.second); + IndexDropOperatorDescriptor btreeDrop = + new IndexDropOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), + storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, + dataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, + splitsAndConstraint.second); + spec.addRoot(btreeDrop); + return spec; + } + + public static JobSpecification dropDatasetJobSpec(Dataset dataset, Index primaryIndex, + MetadataProvider metadataProvider) + throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { + String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName(); + LOGGER.info("DROP DATASETPATH: " + datasetPath); + if (dataset.getDatasetType() == DatasetType.EXTERNAL) { + return RuntimeUtils.createJobSpecification(); + } + + boolean temp = dataset.getDatasetDetails().isTemp(); + ARecordType itemType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset); + JobSpecification specPrimary = RuntimeUtils.createJobSpecification(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), + dataset.getDatasetName(), temp); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( + metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + IndexDropOperatorDescriptor primaryBtreeDrop = + new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(), + storageComponentProvider.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, + indexDataflowHelperFactory, storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop, + splitsAndConstraint.second); + + specPrimary.addRoot(primaryBtreeDrop); + + return specPrimary; + } + + public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName, + MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + String dataverseName = dataverse.getDataverseName(); + IDataFormat format; + try { + format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); + } catch (Exception e) { + throw new AsterixException(e); + } + Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); + if (dataset == null) { + throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName); + } + Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, + datasetName, datasetName); + boolean temp = dataset.getDatasetDetails().isTemp(); + ARecordType itemType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + // get meta item type + ARecordType metaItemType = null; + if (dataset.hasMetaPart()) { + metaItemType = (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), + dataset.getMetaItemTypeName()); + } + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, + itemType, metaItemType, format.getBinaryComparatorFactoryProvider()); + ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); + int[] bloomFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset); + + ITypeTraits[] filterTypeTraits = DatasetUtil.computeFilterTypeTraits(dataset, itemType); + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtil.computeFilterBinaryComparatorFactories(dataset, + itemType, format.getBinaryComparatorFactoryProvider()); + int[] filterFields = DatasetUtil.createFilterFields(dataset); + int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset); + + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp); + FileSplit[] fs = splitsAndConstraint.first.getFileSplits(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < fs.length; i++) { + sb.append(fs[i] + " "); + } + LOGGER.info("CREATING File Splits: " + sb.toString()); + + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + //prepare a LocalResourceMetadata which will be stored in NC's local resource repository + IResourceFactory localResourceMetadata = new LSMBTreeLocalResourceMetadataFactory(typeTraits, + comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first, + compactionInfo.second, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, + dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index), + storageComponentProvider.getMetadataPageManagerFactory()); + ILocalResourceFactoryProvider localResourceFactoryProvider = + new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + index, itemType, metaItemType, compactionInfo.first, compactionInfo.second); + TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, + splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, + dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, + splitsAndConstraint.second); + spec.addRoot(indexCreateOp); + return spec; + } + + public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName, + MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { + String dataverseName = dataverse.getDataverseName(); + IDataFormat format; + try { + format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance(); + } catch (Exception e) { + throw new AsterixException(e); + } + Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); + if (dataset == null) { + throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName); + } + boolean temp = dataset.getDatasetDetails().isTemp(); + ARecordType itemType = + (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset); + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset, + itemType, metaItemType, format.getBinaryComparatorFactoryProvider()); + ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType); + int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), datasetName, datasetName); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + index, itemType, metaItemType, compactionInfo.first, compactionInfo.second); + LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, + splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields, dataflowHelperFactory, + NoOpOperationCallbackFactory.INSTANCE, + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, + splitsAndConstraint.second); + + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, + splitsAndConstraint.second); + spec.addRoot(compactOp); + return spec; + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java deleted file mode 100644 index 18baab1..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.metadata.utils; - -import java.io.DataOutput; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.builders.IARecordBuilder; -import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.CompactionPolicy; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.ExternalDatasetDetails; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.base.AString; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; -import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -public class DatasetUtils { - public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType, - ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) - throws AlgebricksException { - List<List<String>> partitioningKeys = getPartitioningKeys(dataset); - IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()]; - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - // Get comparators for RID fields. - for (int i = 0; i < partitioningKeys.size(); i++) { - try { - bcfs[i] = IndexingConstants.getComparatorFactory(i); - } catch (AsterixException e) { - throw new AlgebricksException(e); - } - } - } else { - InternalDatasetDetails dsd = (InternalDatasetDetails) dataset.getDatasetDetails(); - for (int i = 0; i < partitioningKeys.size(); i++) { - IAType keyType = (dataset.hasMetaPart() && dsd.getKeySourceIndicator().get(i).intValue() == 1) - ? metaItemType.getSubFieldType(partitioningKeys.get(i)) - : itemType.getSubFieldType(partitioningKeys.get(i)); - bcfs[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true); - } - } - return bcfs; - } - - public static int[] createBloomFilterKeyFields(Dataset dataset) throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - throw new AlgebricksException("not implemented"); - } - List<List<String>> partitioningKeys = getPartitioningKeys(dataset); - int[] bloomFilterKeyFields = new int[partitioningKeys.size()]; - for (int i = 0; i < partitioningKeys.size(); ++i) { - bloomFilterKeyFields[i] = i; - } - return bloomFilterKeyFields; - } - - public static IBinaryHashFunctionFactory[] computeKeysBinaryHashFunFactories(Dataset dataset, ARecordType itemType, - IBinaryHashFunctionFactoryProvider hashFunProvider) throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - throw new AlgebricksException("not implemented"); - } - List<List<String>> partitioningKeys = getPartitioningKeys(dataset); - IBinaryHashFunctionFactory[] bhffs = new IBinaryHashFunctionFactory[partitioningKeys.size()]; - for (int i = 0; i < partitioningKeys.size(); i++) { - IAType keyType = itemType.getSubFieldType(partitioningKeys.get(i)); - bhffs[i] = hashFunProvider.getBinaryHashFunctionFactory(keyType); - } - return bhffs; - } - - public static ITypeTraits[] computeTupleTypeTraits(Dataset dataset, ARecordType itemType, ARecordType metaItemType) - throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - throw new AlgebricksException("not implemented"); - } - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); - int numKeys = partitioningKeys.size(); - ITypeTraits[] typeTraits; - if (metaItemType != null) { - typeTraits = new ITypeTraits[numKeys + 2]; - List<Integer> indicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); - typeTraits[numKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaItemType); - for (int i = 0; i < numKeys; i++) { - IAType keyType; - if (indicator.get(i) == 0) { - keyType = itemType.getSubFieldType(partitioningKeys.get(i)); - } else { - keyType = metaItemType.getSubFieldType(partitioningKeys.get(i)); - } - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - } - } else { - typeTraits = new ITypeTraits[numKeys + 1]; - for (int i = 0; i < numKeys; i++) { - IAType keyType; - keyType = itemType.getSubFieldType(partitioningKeys.get(i)); - typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - } - } - typeTraits[numKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); - return typeTraits; - } - - public static List<List<String>> getPartitioningKeys(Dataset dataset) { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties()); - } - return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey(); - } - - public static List<String> getFilterField(Dataset dataset) { - return (((InternalDatasetDetails) dataset.getDatasetDetails())).getFilterField(); - } - - public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset, - ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider) - throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return null; - } - List<String> filterField = getFilterField(dataset); - if (filterField == null) { - return null; - } - IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1]; - IAType type = itemType.getSubFieldType(filterField); - bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true); - return bcfs; - } - - public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType itemType) - throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return null; - } - List<String> filterField = getFilterField(dataset); - if (filterField == null) { - return null; - } - ITypeTraits[] typeTraits = new ITypeTraits[1]; - IAType type = itemType.getSubFieldType(filterField); - typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type); - return typeTraits; - } - - public static int[] createFilterFields(Dataset dataset) throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return null; - } - - List<String> filterField = getFilterField(dataset); - if (filterField == null) { - return null; - } - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); - int numKeys = partitioningKeys.size(); - - int[] filterFields = new int[1]; - filterFields[0] = numKeys + 1; - return filterFields; - } - - public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) throws AlgebricksException { - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return null; - } - - List<String> filterField = getFilterField(dataset); - if (filterField == null) { - return null; - } - - List<List<String>> partitioningKeys = getPartitioningKeys(dataset); - int valueFields = dataset.hasMetaPart() ? 2 : 1; - int[] btreeFields = new int[partitioningKeys.size() + valueFields]; - for (int i = 0; i < btreeFields.length; ++i) { - btreeFields[i] = i; - } - return btreeFields; - } - - public static int getPositionOfPartitioningKeyField(Dataset dataset, String fieldExpr) { - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); - for (int i = 0; i < partitioningKeys.size(); i++) { - if ((partitioningKeys.get(i).size() == 1) && partitioningKeys.get(i).get(0).equals(fieldExpr)) { - return i; - } - } - return -1; - } - - public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset, - MetadataTransactionContext mdTxnCtx) throws AlgebricksException, MetadataException { - String policyName = dataset.getCompactionPolicy(); - CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, - MetadataConstants.METADATA_DATAVERSE_NAME, policyName); - String compactionPolicyFactoryClassName = compactionPolicy.getClassName(); - ILSMMergePolicyFactory mergePolicyFactory; - try { - mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); - if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { - ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId()); - } - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new AlgebricksException(e); - } - Map<String, String> properties = dataset.getCompactionPolicyProperties(); - return new Pair<ILSMMergePolicyFactory, Map<String, String>>(mergePolicyFactory, properties); - } - - @SuppressWarnings("unchecked") - public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType) - throws HyracksDataException { - IARecordBuilder propertyRecordBuilder = new RecordBuilder(); - ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); - propertyRecordBuilder.reset(recordType); - AMutableString aString = new AMutableString(""); - ISerializerDeserializer<AString> stringSerde = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING); - - // write field 0 - fieldValue.reset(); - aString.setValue(name); - stringSerde.serialize(aString, fieldValue.getDataOutput()); - propertyRecordBuilder.addField(0, fieldValue); - - // write field 1 - fieldValue.reset(); - aString.setValue(value); - stringSerde.serialize(aString, fieldValue.getDataOutput()); - propertyRecordBuilder.addField(1, fieldValue); - - propertyRecordBuilder.write(out, true); - } - - public static ARecordType getMetaType(MetadataProvider metadataProvider, Dataset dataset) - throws AlgebricksException { - if (dataset.hasMetaPart()) { - return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(), - dataset.getMetaItemTypeName()); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java index 17a3ebe..2e35ed4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java @@ -34,11 +34,11 @@ import org.apache.asterix.metadata.entities.Dataset; * @author alamouda */ public class ExternalDatasetsRegistry { - public static ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry(); - private ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister; + public static final ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry(); + private final ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister; private ExternalDatasetsRegistry() { - globalRegister = new ConcurrentHashMap<String, ExternalDatasetAccessManager>(); + globalRegister = new ConcurrentHashMap<>(); } /** @@ -59,12 +59,12 @@ public class ExternalDatasetsRegistry { public int getAndLockDatasetVersion(Dataset dataset, MetadataProvider metadataProvider) { - Map<String, Integer> locks = null; + Map<String, Integer> locks; String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName(); // check first if the lock was aquired already locks = metadataProvider.getLocks(); if (locks == null) { - locks = new HashMap<String, Integer>(); + locks = new HashMap<>(); metadataProvider.setLocks(locks); } else { // if dataset was accessed already by this job, return the registered version @@ -130,7 +130,10 @@ public class ExternalDatasetsRegistry { // if dataset was accessed already by this job, return the registered version Set<Entry<String, Integer>> aquiredLocks = locks.entrySet(); for (Entry<String, Integer> entry : aquiredLocks) { - globalRegister.get(entry.getKey()).queryEnd(entry.getValue()); + ExternalDatasetAccessManager accessManager = globalRegister.get(entry.getKey()); + if (accessManager != null) { + accessManager.queryEnd(entry.getValue()); + } } locks.clear(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java new file mode 100644 index 0000000..249f035 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.config.DatasetConfig.TransactionState; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.transactions.IResourceFactory; +import org.apache.asterix.external.api.IAdapterFactory; +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.FilesIndexDescription; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; +import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor; +import org.apache.asterix.external.provider.AdapterFactoryProvider; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.ExternalDatasetDetails; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; +import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; +import org.apache.hyracks.storage.common.file.LocalResource; + +public class ExternalIndexingOperations { + private static final Logger LOGGER = Logger.getLogger(ExternalIndexingOperations.class.getName()); + public static final List<List<String>> FILE_INDEX_FIELD_NAMES = + Collections.unmodifiableList(Collections.singletonList(Collections.singletonList(""))); + public static final List<IAType> FILE_INDEX_FIELD_TYPES = + Collections.unmodifiableList(Collections.singletonList(BuiltinType.ASTRING)); + + private ExternalIndexingOperations() { + } + + public static boolean isIndexible(ExternalDatasetDetails ds) { + String adapter = ds.getAdapter(); + if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) { + return true; + } + return false; + } + + public static boolean isRefereshActive(ExternalDatasetDetails ds) { + return ds.getState() != TransactionState.COMMIT; + } + + public static boolean isValidIndexName(String datasetName, String indexName) { + return !datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName); + } + + public static int getRIDSize(Dataset dataset) { + ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset.getDatasetDetails(); + return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)); + } + + public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) { + ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset.getDatasetDetails(); + return IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)); + } + + public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() { + return IndexingConstants.getBuddyBtreeComparatorFactories(); + } + + public static List<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset) throws AlgebricksException { + ArrayList<ExternalFile> files = new ArrayList<>(); + ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); + try { + // Create the file system object + FileSystem fs = getFileSystemObject(datasetDetails.getProperties()); + // Get paths of dataset + String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH); + String[] paths = path.split(","); + + // Add fileStatuses to files + for (String aPath : paths) { + FileStatus[] fileStatuses = fs.listStatus(new Path(aPath)); + for (int i = 0; i < fileStatuses.length; i++) { + int nextFileNumber = files.size(); + handleFile(dataset, files, fs, fileStatuses[i], nextFileNumber); + } + } + // Close file system + fs.close(); + if (files.isEmpty()) { + throw new AlgebricksException("File Snapshot retrieved from external file system is empty"); + } + return files; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception while trying to get snapshot from external system", e); + throw new AlgebricksException("Unable to get list of HDFS files " + e); + } + } + + private static void handleFile(Dataset dataset, List<ExternalFile> files, FileSystem fs, FileStatus fileStatus, + int nextFileNumber) throws IOException { + if (fileStatus.isDirectory()) { + listSubFiles(dataset, fs, fileStatus, files); + } else { + files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, + fileStatus.getPath().toUri().getPath(), new Date(fileStatus.getModificationTime()), + fileStatus.getLen(), ExternalFilePendingOp.NO_OP)); + } + } + + /* list all files under the directory + * src is expected to be a folder + */ + private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, List<ExternalFile> files) + throws IOException { + Path path = src.getPath(); + FileStatus[] fileStatuses = srcFs.listStatus(path); + for (int i = 0; i < fileStatuses.length; i++) { + int nextFileNumber = files.size(); + if (fileStatuses[i].isDirectory()) { + listSubFiles(dataset, srcFs, fileStatuses[i], files); + } else { + files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, + fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()), + fileStatuses[i].getLen(), ExternalFilePendingOp.NO_OP)); + } + } + } + + public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException { + Configuration conf = new Configuration(); + conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim()); + conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName()); + return FileSystem.get(conf); + } + + public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset, + List<ExternalFile> externalFilesSnapshot, MetadataProvider metadataProvider, boolean createIndex) + throws AlgebricksException { + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; + Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = + metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), + dataset.getDatasetName(), IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true); + IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; + FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); + IResourceFactory localResourceMetadata = new ExternalBTreeLocalResourceMetadataFactory( + filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, + FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, false, dataset.getDatasetId(), + mergePolicyFactory, mergePolicyFactoryProperties, dataset.getIndexOperationTrackerFactory(fileIndex), + dataset.getIoOperationCallbackFactory(fileIndex), + storageComponentProvider.getMetadataPageManagerFactory()); + PersistentLocalResourceFactoryProvider localResourceFactoryProvider = + new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.ExternalBTreeResource); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + ExternalFilesIndexOperatorDescriptor externalFilesOp = + new ExternalFilesIndexOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), + storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider, + dataflowHelperFactory, localResourceFactoryProvider, externalFilesSnapshot, createIndex, + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp, + secondarySplitsAndConstraint.second); + spec.addRoot(externalFilesOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + /** + * This method create an indexing operator that index records in HDFS + * + * @param jobSpec + * @param itemType + * @param dataset + * @param files + * @param indexerDesc + * @return + * @throws AlgebricksException + * @throws HyracksDataException + * @throws Exception + */ + private static Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> getIndexingOperator( + MetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset, + List<ExternalFile> files, RecordDescriptor indexerDesc) throws HyracksDataException, AlgebricksException { + ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); + Map<String, String> configuration = externalDatasetDetails.getProperties(); + IAdapterFactory adapterFactory = + AdapterFactoryProvider.getIndexingAdapterFactory(metadataProvider.getLibraryManager(), + externalDatasetDetails.getAdapter(), configuration, (ARecordType) itemType, files, true, null); + return new Pair<>(new ExternalScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory), + adapterFactory.getPartitionConstraint()); + } + + public static Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp( + JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, ARecordType itemType, + RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException { + return getIndexingOperator(metadataProvider, spec, itemType, dataset, + files == null ? MetadataManager.INSTANCE + .getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset) : files, + indexerDesc); + } + + /** + * At the end of this method, we expect to have 4 sets as follows: + * metadataFiles should contain only the files that are appended in their original state + * addedFiles should contain new files that has number assigned starting after the max original file number + * deletedFiles should contain files that are no longer there in the file system + * appendedFiles should have the new file information of existing files + * The method should return false in case of zero delta + * + * @param dataset + * @param metadataFiles + * @param addedFiles + * @param deletedFiles + * @param appendedFiles + * @return + * @throws AlgebricksException + */ + public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles, + List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles) + throws AlgebricksException { + boolean uptodate = true; + int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1; + + List<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset); + + // Loop over file system files < taking care of added files > + for (ExternalFile fileSystemFile : fileSystemFiles) { + boolean fileFound = false; + Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator(); + while (mdFilesIterator.hasNext()) { + ExternalFile metadataFile = mdFilesIterator.next(); + if (!fileSystemFile.getFileName().equals(metadataFile.getFileName())) { + continue; + } + // Same file name + if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) { + // Same timestamp + if (fileSystemFile.getSize() == metadataFile.getSize()) { + // Same size -> no op + mdFilesIterator.remove(); + fileFound = true; + } else { + // Different size -> append op + metadataFile.setPendingOp(ExternalFilePendingOp.APPEND_OP); + fileSystemFile.setPendingOp(ExternalFilePendingOp.APPEND_OP); + appendedFiles.add(fileSystemFile); + fileFound = true; + uptodate = false; + } + } else { + // Same file name, Different file mod date -> delete and add + metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP); + deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), + 0, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), metadataFile.getSize(), + ExternalFilePendingOp.DROP_OP)); + fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP); + fileSystemFile.setFileNumber(newFileNumber); + addedFiles.add(fileSystemFile); + newFileNumber++; + fileFound = true; + uptodate = false; + } + if (fileFound) { + break; + } + } + if (!fileFound) { + // File not stored previously in metadata -> pending add op + fileSystemFile.setPendingOp(ExternalFilePendingOp.ADD_OP); + fileSystemFile.setFileNumber(newFileNumber); + addedFiles.add(fileSystemFile); + newFileNumber++; + uptodate = false; + } + } + + // Done with files from external file system -> metadata files now contain both deleted files and appended ones + // first, correct number assignment to deleted and updated files + for (ExternalFile deletedFile : deletedFiles) { + deletedFile.setFileNumber(newFileNumber); + newFileNumber++; + } + for (ExternalFile appendedFile : appendedFiles) { + appendedFile.setFileNumber(newFileNumber); + newFileNumber++; + } + + // include the remaining deleted files + Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator(); + while (mdFilesIterator.hasNext()) { + ExternalFile metadataFile = mdFilesIterator.next(); + if (metadataFile.getPendingOp() == ExternalFilePendingOp.NO_OP) { + metadataFile.setPendingOp(ExternalFilePendingOp.DROP_OP); + deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), + newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), + metadataFile.getSize(), metadataFile.getPendingOp())); + newFileNumber++; + uptodate = false; + } + } + return uptodate; + } + + public static Dataset createTransactionDataset(Dataset dataset) { + ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails(); + ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(), + originalDsd.getTimestamp(), TransactionState.BEGIN); + return new Dataset(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getItemTypeDataverseName(), + dataset.getItemTypeName(), dataset.getNodeGroupName(), dataset.getCompactionPolicy(), + dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(), DatasetType.EXTERNAL, + dataset.getDatasetId(), dataset.getPendingOp()); + } + + public static JobSpecification buildDropFilesIndexJobSpec(MetadataProvider metadataProvider, Dataset dataset) + throws AlgebricksException { + String indexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), + dataset.getDatasetName(), indexName, true); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + fileIndex, null, null, compactionInfo.first, compactionInfo.second); + IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, + metadataProvider.getStorageComponentProvider().getStorageManager(), + metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(), + splitsAndConstraint.first, dataflowHelperFactory, + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, + splitsAndConstraint.second); + spec.addRoot(btreeDrop); + + return spec; + } + + public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles, + List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, MetadataProvider metadataProvider) + throws AlgebricksException { + ArrayList<ExternalFile> files = new ArrayList<>(); + for (ExternalFile file : metadataFiles) { + if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { + files.add(file); + } else if (file.getPendingOp() == ExternalFilePendingOp.APPEND_OP) { + for (ExternalFile appendedFile : appendedFiles) { + if (appendedFile.getFileName().equals(file.getFileName())) { + files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), + file.getFileNumber(), file.getFileName(), file.getLastModefiedTime(), + appendedFile.getSize(), ExternalFilePendingOp.NO_OP)); + } + } + } + } + for (ExternalFile file : addedFiles) { + files.add(file); + } + Collections.sort(files); + return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false); + } + + public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles, + List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, MetadataProvider metadataProvider) + throws AlgebricksException { + // Create files list + ArrayList<ExternalFile> files = new ArrayList<>(); + + for (ExternalFile metadataFile : metadataFiles) { + if (metadataFile.getPendingOp() != ExternalFilePendingOp.APPEND_OP) { + files.add(metadataFile); + } else { + metadataFile.setPendingOp(ExternalFilePendingOp.NO_OP); + files.add(metadataFile); + } + } + // add new files + for (ExternalFile file : addedFiles) { + files.add(file); + } + // add appended files + for (ExternalFile file : appendedFiles) { + files.add(file); + } + return IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, null, null, null, null, metadataProvider, files); + } + + public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) + throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext()); + boolean temp = ds.getDatasetDetails().isTemp(); + ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; + Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp); + IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + ds.getDataverseName(), ds.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); + + ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>(); + ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>(); + for (Index index : indexes) { + if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + index.getIndexName(), temp); + IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + treeDataflowHelperFactories.add(indexDataflowHelperFactory); + treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER)); + } + } + + ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec, + filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos); + + spec.addRoot(op); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, + filesIndexSplitsAndConstraint.second); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) + throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext()); + ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; + Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; + + boolean temp = ds.getDatasetDetails().isTemp(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp); + IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + ds.getDataverseName(), ds.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); + + ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>(); + ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>(); + for (Index index : indexes) { + if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + index.getIndexName(), temp); + IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + treeDataflowHelperFactories.add(indexDataflowHelperFactory); + treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER)); + } + } + + ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec, + filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos); + + spec.addRoot(op); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, + filesIndexSplitsAndConstraint.second); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + + } + + public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) + throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext()); + ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; + Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; + boolean temp = ds.getDatasetDetails().isTemp(); + + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp); + IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + ds.getDataverseName(), ds.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory filesIndexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); + + ArrayList<IIndexDataflowHelperFactory> treeDataflowHelperFactories = new ArrayList<>(); + ArrayList<IndexInfoOperatorDescriptor> treeInfos = new ArrayList<>(); + for (Index index : indexes) { + if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(), + index.getIndexName(), temp); + IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory( + metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + treeDataflowHelperFactories.add(indexDataflowHelperFactory); + treeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER)); + } + } + + ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec, + filesIndexDataflowHelperFactory, filesIndexInfo, treeDataflowHelperFactories, treeInfos); + + spec.addRoot(op); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, + filesIndexSplitsAndConstraint.second); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, MetadataProvider metadataProvider, + IStorageComponentProvider storageComponentProvider) throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; + Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = + metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), + dataset.getDatasetName(), IndexingConstants.getFilesIndexName(dataset.getDatasetName()), true); + IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; + + String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset); + Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); + IIndexDataflowHelperFactory dataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider, + fileIndex, null, null, mergePolicyFactory, mergePolicyFactoryProperties); + FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); + LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, + secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, + FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, dataflowHelperFactory, + NoOpOperationCallbackFactory.INSTANCE, storageComponentProvider.getMetadataPageManagerFactory()); + spec.addRoot(compactOp); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, + secondarySplitsAndConstraint.second); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + public static boolean isFileIndex(Index index) { + return index.getIndexName().equals(IndexingConstants.getFilesIndexName(index.getDatasetName())); + } +}
