http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java new file mode 100644 index 0000000..f7e569c --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.metadata.utils; + +import java.io.DataOutput; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.context.ITransactionSubsystemProvider; +import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor; +import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; +import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.runtime.evaluators.functions.AndDescriptor; +import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor; +import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor; +import org.apache.asterix.runtime.evaluators.functions.NotDescriptor; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; +import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor; +import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; + +@SuppressWarnings("rawtypes") +// TODO: We should eventually have a hierarchy of classes that can create all +// possible index job specs, +// not just for creation. +public abstract class SecondaryIndexOperationsHelper { + protected final PhysicalOptimizationConfig physOptConf; + protected final MetadataProvider metadataProvider; + protected final Dataset dataset; + protected final Index index; + protected final ARecordType itemType; + protected final ARecordType metaType; + protected final ARecordType enforcedItemType; + protected final ARecordType enforcedMetaType; + protected ISerializerDeserializer metaSerde; + protected ISerializerDeserializer payloadSerde; + protected IFileSplitProvider primaryFileSplitProvider; + protected AlgebricksPartitionConstraint primaryPartitionConstraint; + protected IFileSplitProvider secondaryFileSplitProvider; + protected AlgebricksPartitionConstraint secondaryPartitionConstraint; + protected boolean anySecondaryKeyIsNullable = false; + protected long numElementsHint; + protected IBinaryComparatorFactory[] primaryComparatorFactories; + protected int[] primaryBloomFilterKeyFields; + protected RecordDescriptor primaryRecDesc; + protected IBinaryComparatorFactory[] secondaryComparatorFactories; + protected ITypeTraits[] secondaryTypeTraits; + protected int[] secondaryBloomFilterKeyFields; + protected RecordDescriptor secondaryRecDesc; + protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories; + protected IPropertiesProvider propertiesProvider; + protected ILSMMergePolicyFactory mergePolicyFactory; + protected Map<String, String> mergePolicyFactoryProperties; + protected RecordDescriptor enforcedRecDesc; + protected int numFilterFields; + protected List<String> filterFieldName; + protected ITypeTraits[] filterTypeTraits; + protected IBinaryComparatorFactory[] filterCmpFactories; + protected int[] secondaryFilterFields; + protected int[] primaryFilterFields; + protected int[] primaryBTreeFields; + protected int[] secondaryBTreeFields; + protected List<ExternalFile> externalFiles; + protected int numPrimaryKeys; + + // Prevent public construction. Should be created via createIndexCreator(). + protected SecondaryIndexOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf, + IPropertiesProvider propertiesProvider, MetadataProvider metadataProvider, ARecordType recType, + ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) { + this.dataset = dataset; + this.index = index; + this.physOptConf = physOptConf; + this.propertiesProvider = propertiesProvider; + this.metadataProvider = metadataProvider; + this.itemType = recType; + this.metaType = metaType; + this.enforcedItemType = enforcedType; + this.enforcedMetaType = enforcedMetaType; + } + + public static SecondaryIndexOperationsHelper createIndexOperationsHelper(Dataset dataset, Index index, + MetadataProvider metadataProvider, PhysicalOptimizationConfig physOptConf, ARecordType recType, + ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) throws AlgebricksException { + IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; + SecondaryIndexOperationsHelper indexOperationsHelper; + switch (index.getIndexType()) { + case BTREE: + indexOperationsHelper = + new SecondaryBTreeOperationsHelper(dataset, index, physOptConf, asterixPropertiesProvider, + metadataProvider, recType, metaType, enforcedType, enforcedMetaType); + break; + case RTREE: + indexOperationsHelper = + new SecondaryRTreeOperationsHelper(dataset, index, physOptConf, asterixPropertiesProvider, + metadataProvider, recType, metaType, enforcedType, enforcedMetaType); + break; + case SINGLE_PARTITION_WORD_INVIX: + case SINGLE_PARTITION_NGRAM_INVIX: + case LENGTH_PARTITIONED_WORD_INVIX: + case LENGTH_PARTITIONED_NGRAM_INVIX: + indexOperationsHelper = new SecondaryInvertedIndexOperationsHelper(dataset, index, physOptConf, + asterixPropertiesProvider, metadataProvider, recType, metaType, enforcedType, + enforcedMetaType); + break; + default: + throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE, index.getIndexType()); + } + indexOperationsHelper.init(); + return indexOperationsHelper; + } + + public abstract JobSpecification buildCreationJobSpec() throws AlgebricksException; + + public abstract JobSpecification buildLoadingJobSpec() throws AlgebricksException; + + public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException; + + protected void init() throws AlgebricksException { + payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); + metaSerde = + metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(), + index.getIndexName(), dataset.getDatasetDetails().isTemp()); + secondaryFileSplitProvider = secondarySplitsAndConstraint.first; + secondaryPartitionConstraint = secondarySplitsAndConstraint.second; + numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size(); + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + filterFieldName = DatasetUtil.getFilterField(dataset); + if (filterFieldName != null) { + numFilterFields = 1; + } else { + numFilterFields = 0; + } + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), + dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp()); + primaryFileSplitProvider = primarySplitsAndConstraint.first; + primaryPartitionConstraint = primarySplitsAndConstraint.second; + setPrimaryRecDescAndComparators(); + } + setSecondaryRecDescAndComparators(); + numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset); + Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = + DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); + mergePolicyFactory = compactionInfo.first; + mergePolicyFactoryProperties = compactionInfo.second; + if (numFilterFields > 0) { + setFilterTypeTraitsAndComparators(); + } + } + + protected void setFilterTypeTraitsAndComparators() throws AlgebricksException { + filterTypeTraits = new ITypeTraits[numFilterFields]; + filterCmpFactories = new IBinaryComparatorFactory[numFilterFields]; + secondaryFilterFields = new int[numFilterFields]; + primaryFilterFields = new int[numFilterFields]; + primaryBTreeFields = new int[numPrimaryKeys + 1]; + secondaryBTreeFields = new int[index.getKeyFieldNames().size() + numPrimaryKeys]; + for (int i = 0; i < primaryBTreeFields.length; i++) { + primaryBTreeFields[i] = i; + } + for (int i = 0; i < secondaryBTreeFields.length; i++) { + secondaryBTreeFields[i] = i; + } + + IAType type = itemType.getSubFieldType(filterFieldName); + filterCmpFactories[0] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true); + filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type); + secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys; + primaryFilterFields[0] = numPrimaryKeys + 1; + } + + protected abstract int getNumSecondaryKeys(); + + protected void setPrimaryRecDescAndComparators() throws AlgebricksException { + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(dataset); + ISerializerDeserializer[] primaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; + ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; + primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys]; + primaryBloomFilterKeyFields = new int[numPrimaryKeys]; + ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); + List<Integer> indicators = null; + if (dataset.hasMetaPart()) { + indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); + } + for (int i = 0; i < numPrimaryKeys; i++) { + IAType keyType = + (indicators == null || indicators.get(i) == 0) ? itemType.getSubFieldType(partitioningKeys.get(i)) + : metaType.getSubFieldType(partitioningKeys.get(i)); + primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType); + primaryComparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); + primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); + primaryBloomFilterKeyFields[i] = i; + } + primaryRecFields[numPrimaryKeys] = payloadSerde; + primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + if (dataset.hasMetaPart()) { + primaryRecFields[numPrimaryKeys + 1] = payloadSerde; + primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + } + primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits); + } + + protected abstract void setSecondaryRecDescAndComparators() throws AlgebricksException; + + protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException { + // Build dummy tuple containing one field with a dummy value inside. + ArrayTupleBuilder tb = new ArrayTupleBuilder(1); + DataOutput dos = tb.getDataOutput(); + tb.reset(); + try { + // Serialize dummy value into a field. + IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); + } catch (HyracksDataException e) { + throw new AsterixException(e); + } + // Add dummy field. + tb.addFieldEndOffset(); + ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE }; + RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); + ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, + keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, + primaryPartitionConstraint); + return keyProviderOp; + } + + protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) + throws AlgebricksException { + // -Infinity + int[] lowKeyFields = null; + // +Infinity + int[] highKeyFields = null; + ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; + JobId jobId = JobIdFactory.generateJobId(); + metadataProvider.setJobId(jobId); + boolean isWriteTransaction = metadataProvider.isWriteTransaction(); + IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); + spec.setJobletEventListenerFactory(jobEventListenerFactory); + Index primaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), + dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); + + boolean temp = dataset.getDatasetDetails().isTemp(); + ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE + : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), + primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); + BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, + RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, + primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, + primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true, + dataset.getIndexDataflowHelperFactory(metadataProvider, primaryIndex, itemType, metaType, + mergePolicyFactory, mergePolicyFactoryProperties), + false, false, null, searchCallbackFactory, null, null, + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, + primaryPartitionConstraint); + return primarySearchOp; + } + + protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, int numSecondaryKeyFields, + RecordDescriptor secondaryRecDesc) throws AlgebricksException { + int[] outColumns = new int[numSecondaryKeyFields + numFilterFields]; + int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; + for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) { + outColumns[i] = numPrimaryKeys + i; + } + int projCount = 0; + for (int i = 0; i < numSecondaryKeyFields; i++) { + projectionList[projCount++] = numPrimaryKeys + i; + } + for (int i = 0; i < numPrimaryKeys; i++) { + projectionList[projCount++] = i; + } + if (numFilterFields > 0) { + projectionList[projCount] = numPrimaryKeys + numSecondaryKeyFields; + } + + IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; + for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { + sefs[i] = secondaryFieldAccessEvalFactories[i]; + } + AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); + AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, + new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc }); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp, + primaryPartitionConstraint); + return asterixAssignOp; + } + + protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, DatasetType dsType) { + CastTypeDescriptor castFuncDesc = (CastTypeDescriptor) CastTypeDescriptor.FACTORY.createFunctionDescriptor(); + castFuncDesc.setImmutableStates(enforcedItemType, itemType); + + int[] outColumns = new int[1]; + int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + numPrimaryKeys]; + int recordIdx; + //external datascan operator returns a record as the first field, instead of the last in internal case + if (dsType == DatasetType.EXTERNAL) { + recordIdx = 0; + outColumns[0] = 0; + } else { + recordIdx = numPrimaryKeys; + outColumns[0] = numPrimaryKeys; + } + for (int i = 0; i <= numPrimaryKeys; i++) { + projectionList[i] = i; + } + if (dataset.hasMetaPart()) { + projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1; + } + IScalarEvaluatorFactory[] castEvalFact = + new IScalarEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) }; + IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1]; + sefs[0] = castFuncDesc.createEvaluatorFactory(castEvalFact); + AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList); + return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { castAssign }, + new RecordDescriptor[] { enforcedRecDesc }); + } + + protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, + IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { + int[] sortFields = new int[secondaryComparatorFactories.length]; + for (int i = 0; i < secondaryComparatorFactories.length; i++) { + sortFields[i] = i; + } + ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, + physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); + return sortOp; + } + + protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, + int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) + throws AlgebricksException { + TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec, + secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER, + RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, + secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, + fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, + secondaryPartitionConstraint); + return treeIndexBulkLoadOp; + } + + public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields, + RecordDescriptor secondaryRecDesc) throws AlgebricksException { + IScalarEvaluatorFactory[] andArgsEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeyFields]; + NotDescriptor notDesc = new NotDescriptor(); + IsUnknownDescriptor isUnknownDesc = new IsUnknownDescriptor(); + for (int i = 0; i < numSecondaryKeyFields; i++) { + // Access column i, and apply 'is not null'. + ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i); + IScalarEvaluatorFactory isUnknownEvalFactory = + isUnknownDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { columnAccessEvalFactory }); + IScalarEvaluatorFactory notEvalFactory = + notDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { isUnknownEvalFactory }); + andArgsEvalFactories[i] = notEvalFactory; + } + IScalarEvaluatorFactory selectCond; + if (numSecondaryKeyFields > 1) { + // Create conjunctive condition where all secondary index keys must + // satisfy 'is not null'. + AndDescriptor andDesc = new AndDescriptor(); + selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories); + } else { + selectCond = andArgsEvalFactories[0]; + } + StreamSelectRuntimeFactory select = + new StreamSelectRuntimeFactory(selectCond, null, BinaryBooleanInspector.FACTORY, false, -1, null); + AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, + new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc }); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp, + primaryPartitionConstraint); + return asterixSelectOp; + } + + // This method creates a source indexing operator for external data + protected ExternalScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec) + throws AlgebricksException { + // A record + primary keys + ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys]; + ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys]; + // payload serde and type traits for the record slot + serdes[0] = payloadSerde; + typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); + // serdes and type traits for rid fields + for (int i = 1; i < serdes.length; i++) { + serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1); + typeTraits[i] = IndexingConstants.getTypeTraits(i - 1); + } + // output record desc + RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits); + + // Create the operator and its partition constraits + Pair<ExternalScanOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints; + try { + indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider, + dataset, itemType, indexerDesc, externalFiles); + } catch (Exception e) { + throw new AlgebricksException(e); + } + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first, + indexingOpAndConstraints.second); + + // Set the primary partition constraints to this partition constraints + primaryPartitionConstraint = indexingOpAndConstraints.second; + return indexingOpAndConstraints.first; + } + + protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys, + RecordDescriptor secondaryRecDesc) throws AlgebricksException { + int[] outColumns = new int[numSecondaryKeys]; + int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < numSecondaryKeys; i++) { + outColumns[i] = i + numPrimaryKeys + 1; + projectionList[i] = i + numPrimaryKeys + 1; + } + + IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; + for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { + sefs[i] = secondaryFieldAccessEvalFactories[i]; + } + //add External RIDs to the projection list + for (int i = 0; i < numPrimaryKeys; i++) { + projectionList[numSecondaryKeys + i] = i + 1; + } + + AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); + return new AlgebricksMetaOperatorDescriptor(spec, 1, 1, new IPushRuntimeFactory[] { assign }, + new RecordDescriptor[] { secondaryRecDesc }); + } + + protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec, + int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) + throws AlgebricksException { + // create a list of file ids + int numOfDeletedFiles = 0; + for (ExternalFile file : externalFiles) { + if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { + numOfDeletedFiles++; + } + } + int[] deletedFiles = new int[numOfDeletedFiles]; + int i = 0; + for (ExternalFile file : externalFiles) { + if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { + deletedFiles[i] = file.getFileNumber(); + } + } + ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor( + spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, + secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, + secondaryBloomFilterKeyFields, dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE, + deletedFiles, fieldPermutation, fillFactor, numElementsHint, + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, + secondaryPartitionConstraint); + return treeIndexBulkLoadOp; + } + + public List<ExternalFile> getExternalFiles() { + return externalFiles; + } + + public void setExternalFiles(List<ExternalFile> externalFiles) { + this.externalFiles = externalFiles; + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java new file mode 100644 index 0000000..b86004a --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.transactions.IResourceFactory; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.RecordUtil; +import org.apache.asterix.runtime.formats.FormatUtils; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; +import org.apache.hyracks.algebricks.data.ITypeTraitProvider; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; +import org.apache.hyracks.data.std.primitive.ShortPointable; +import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator; +import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor; +import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; +import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; +import org.apache.hyracks.storage.common.file.LocalResource; + +public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper { + + private IAType secondaryKeyType; + private ITypeTraits[] invListsTypeTraits; + private IBinaryComparatorFactory[] tokenComparatorFactories; + private ITypeTraits[] tokenTypeTraits; + private IBinaryTokenizerFactory tokenizerFactory; + // For tokenization, sorting and loading. Represents <token, primary keys>. + private int numTokenKeyPairFields; + private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories; + private RecordDescriptor tokenKeyPairRecDesc; + private boolean isPartitioned; + private int[] invertedIndexFields; + private int[] invertedIndexFieldsForNonBulkLoadOps; + private int[] secondaryFilterFieldsForNonBulkLoadOps; + + protected SecondaryInvertedIndexOperationsHelper(Dataset dataset, Index index, + PhysicalOptimizationConfig physOptConf, IPropertiesProvider propertiesProvider, + MetadataProvider metadataProvider, ARecordType recType, ARecordType metaType, ARecordType enforcedType, + ARecordType enforcedMetaType) { + super(dataset, index, physOptConf, propertiesProvider, metadataProvider, recType, metaType, enforcedType, + enforcedMetaType); + } + + @Override + @SuppressWarnings("rawtypes") + protected void setSecondaryRecDescAndComparators() throws AlgebricksException { + int numSecondaryKeys = index.getKeyFieldNames().size(); + IndexType indexType = index.getIndexType(); + boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds(); + // Sanity checks. + if (numPrimaryKeys > 1) { + throw new CompilationException( + ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX, indexType, + RecordUtil.toFullyQualifiedName(dataset.getDataverseName(), dataset.getDatasetName())); + } + if (numSecondaryKeys > 1) { + throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys, + indexType, 1); + } + if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX + || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { + isPartitioned = true; + } else { + isPartitioned = false; + } + // Prepare record descriptor used in the assign op, and the optional + // select op. + secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; + ISerializerDeserializer[] secondaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields]; + ISerializerDeserializer[] enforcedRecFields = + new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; + secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; + ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; + ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider(); + ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider(); + if (numSecondaryKeys > 0) { + secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory( + isEnforcingKeyTypes ? enforcedItemType : itemType, index.getKeyFieldNames().get(0), + numPrimaryKeys); + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + index.getKeyFieldNames().get(0), itemType); + secondaryKeyType = keyTypePair.first; + anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; + ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType); + secondaryRecFields[0] = keySerde; + secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType); + } + if (numFilterFields > 0) { + secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat() + .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys); + Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); + IAType type = keyTypePair.first; + ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); + secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; + } + secondaryRecDesc = new RecordDescriptor(secondaryRecFields); + // Comparators and type traits for tokens. + int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; + tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; + tokenTypeTraits = new ITypeTraits[numTokenFields]; + tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); + tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); + if (isPartitioned) { + // The partitioning field is hardcoded to be a short *without* an Asterix type tag. + tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); + tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; + } + // Set tokenizer factory. + // TODO: We might want to expose the hashing option at the AQL level, + // and add the choice to the index metadata. + tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType, + index.getGramLength()); + // Type traits for inverted-list elements. Inverted lists contain + // primary keys. + invListsTypeTraits = new ITypeTraits[numPrimaryKeys]; + if (numPrimaryKeys > 0) { + invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; + enforcedRecFields[0] = primaryRecDesc.getFields()[0]; + enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; + } + enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); + enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); + // For tokenization, sorting and loading. + // One token (+ optional partitioning field) + primary keys. + numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys; + ISerializerDeserializer[] tokenKeyPairFields = + new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields]; + ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields]; + tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields]; + tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType); + tokenKeyPairTypeTraits[0] = tokenTypeTraits[0]; + tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); + int pkOff = 1; + if (isPartitioned) { + tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE; + tokenKeyPairTypeTraits[1] = tokenTypeTraits[1]; + tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); + pkOff = 2; + } + if (numPrimaryKeys > 0) { + tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0]; + tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0]; + tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0]; + } + if (numFilterFields > 0) { + tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys]; + } + tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits); + if (filterFieldName != null) { + invertedIndexFields = new int[numTokenKeyPairFields]; + for (int i = 0; i < invertedIndexFields.length; i++) { + invertedIndexFields[i] = i; + } + secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields]; + secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys; + invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys]; + for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) { + invertedIndexFieldsForNonBulkLoadOps[i] = i; + } + } + } + + @Override + protected int getNumSecondaryKeys() { + return numTokenKeyPairFields - numPrimaryKeys; + } + + @Override + public JobSpecification buildCreationJobSpec() throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + //prepare a LocalResourceMetadata which will be stored in NC's local resource repository + IResourceFactory localResourceMetadata = new LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits, + primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned, + dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, + filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, + invertedIndexFieldsForNonBulkLoadOps, dataset.getIndexOperationTrackerFactory(index), + dataset.getIoOperationCallbackFactory(index), + storageComponentProvider.getMetadataPageManagerFactory()); + ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( + localResourceMetadata, LocalResource.LSMInvertedIndexResource); + + IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); + LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = + new LSMInvertedIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), + secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(), + tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, + tokenizerFactory, dataflowHelperFactory, localResourceFactoryProvider, + dataset.getModificationCallbackFactory(storageComponentProvider, index, null, + IndexOperation.CREATE, null), + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp, + secondaryPartitionConstraint); + spec.addRoot(invIndexCreateOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + @Override + public JobSpecification buildLoadingJobSpec() throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + + // Create dummy key provider for feeding the primary index scan. + AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); + + // Create primary index scan op. + BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); + + AbstractOperatorDescriptor sourceOp = primaryScanOp; + boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds(); + int numSecondaryKeys = index.getKeyFieldNames().size(); + if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { + sourceOp = createCastOp(spec, dataset.getDatasetType()); + spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); + } + AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, numSecondaryKeys, secondaryRecDesc); + + // If any of the secondary fields are nullable, then add a select op + // that filters nulls. + AlgebricksMetaOperatorDescriptor selectOp = null; + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, secondaryRecDesc); + } + + // Create a tokenizer op. + AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec); + + // Sort by token + primary keys. + ExternalSortOperatorDescriptor sortOp = + createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc); + + // Create secondary inverted index bulk load op. + LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec); + + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); + // Connect the operators. + spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0); + } else { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0); + } + spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0); + spec.addRoot(metaOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException { + int docField = 0; + int numSecondaryKeys = index.getKeyFieldNames().size(); + int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields]; + for (int i = 0; i < primaryKeyFields.length; i++) { + primaryKeyFields[i] = numSecondaryKeys + i; + } + BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, + tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned, false); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp, + primaryPartitionConstraint); + return tokenizerOp; + } + + @Override + protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, + IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { + // Sort on token and primary keys. + int[] sortFields = new int[numTokenKeyPairFields]; + for (int i = 0; i < numTokenKeyPairFields; i++) { + sortFields[i] = i; + } + ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, + physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); + return sortOp; + } + + private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) + throws AlgebricksException { + int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields]; + for (int i = 0; i < fieldPermutation.length; i++) { + fieldPermutation[i] = i; + } + IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor( + spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false, + storageComponentProvider.getStorageManager(), secondaryFileSplitProvider, + storageComponentProvider.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, + invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory, + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp, + secondaryPartitionConstraint); + return invIndexBulkLoadOp; + } + + private IIndexDataflowHelperFactory createDataflowHelperFactory() throws AlgebricksException { + return dataset.getIndexDataflowHelperFactory(metadataProvider, index, itemType, metaType, mergePolicyFactory, + mergePolicyFactoryProperties); + } + + @Override + public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + LSMInvertedIndexCompactOperator compactOp = + new LSMInvertedIndexCompactOperator(spec, storageComponentProvider.getStorageManager(), + secondaryFileSplitProvider, storageComponentProvider.getIndexLifecycleManagerProvider(), + tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, + tokenizerFactory, dataflowHelperFactory, + dataset.getModificationCallbackFactory(storageComponentProvider, index, null, + IndexOperation.FULL_MERGE, null), + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, + secondaryPartitionConstraint); + + spec.addRoot(compactOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java new file mode 100644 index 0000000..460b635 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.utils; + +import java.util.List; + +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.transactions.IResourceFactory; +import org.apache.asterix.external.indexing.IndexingConstants; +import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadataFactory; +import org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadataFactory; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; +import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.api.dataflow.IOperatorDescriptor; +import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITypeTraits; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; +import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; +import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; +import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; +import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; +import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; +import org.apache.hyracks.storage.common.file.LocalResource; + +@SuppressWarnings("rawtypes") +public class SecondaryRTreeOperationsHelper extends SecondaryIndexOperationsHelper { + + protected IPrimitiveValueProviderFactory[] valueProviderFactories; + protected int numNestedSecondaryKeyFields; + protected ATypeTag keyType; + protected int[] primaryKeyFields; + protected int[] rtreeFields; + protected boolean isPointMBR; + protected RecordDescriptor secondaryRecDescForPointMBR = null; + + protected SecondaryRTreeOperationsHelper(Dataset dataset, Index index, PhysicalOptimizationConfig physOptConf, + IPropertiesProvider propertiesProvider, MetadataProvider metadataProvider, ARecordType recType, + ARecordType metaType, ARecordType enforcedType, ARecordType enforcedMetaType) { + super(dataset, index, physOptConf, propertiesProvider, metadataProvider, recType, metaType, enforcedType, + enforcedMetaType); + } + + @Override + public JobSpecification buildCreationJobSpec() throws AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( + metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties); + IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); + ILocalResourceFactoryProvider localResourceFactoryProvider; + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree(); + //prepare a LocalResourceMetadata which will be stored in NC's local resource repository + IResourceFactory localResourceMetadata = new LSMRTreeLocalResourceMetadataFactory(secondaryTypeTraits, + secondaryComparatorFactories, btreeCompFactories, valueProviderFactories, RTreePolicyType.RTREE, + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), + dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, + filterCmpFactories, rtreeFields, primaryKeyFields, secondaryFilterFields, isPointMBR, + dataset.getIndexOperationTrackerFactory(index), dataset.getIoOperationCallbackFactory(index), + storageComponentProvider.getMetadataPageManagerFactory()); + localResourceFactoryProvider = + new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMRTreeResource); + } else { + // External dataset + // Prepare a LocalResourceMetadata which will be stored in NC's local resource repository + IResourceFactory localResourceMetadata = new ExternalRTreeLocalResourceMetadataFactory(secondaryTypeTraits, + secondaryComparatorFactories, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), + valueProviderFactories, RTreePolicyType.RTREE, + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), + dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, primaryKeyFields, + isPointMBR, dataset.getIndexOperationTrackerFactory(index), + dataset.getIoOperationCallbackFactory(index), + storageComponentProvider.getMetadataPageManagerFactory()); + localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, + LocalResource.ExternalRTreeResource); + } + + TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = + new TreeIndexCreateOperatorDescriptor(spec, storageComponentProvider.getStorageManager(), + storageComponentProvider.getIndexLifecycleManagerProvider(), secondaryFileSplitProvider, + secondaryTypeTraits, secondaryComparatorFactories, null, indexDataflowHelperFactory, + localResourceFactoryProvider, + dataset.getModificationCallbackFactory(storageComponentProvider, index, null, + IndexOperation.CREATE, null), + storageComponentProvider.getMetadataPageManagerFactory()); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, + secondaryPartitionConstraint); + spec.addRoot(secondaryIndexCreateOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } + + private IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree() { + IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length]; + int i = 0; + for (; i < secondaryComparatorFactories.length; i++) { + btreeCompFactories[i] = secondaryComparatorFactories[i]; + } + for (int j = 0; i < secondaryTypeTraits.length; i++, j++) { + btreeCompFactories[i] = primaryComparatorFactories[j]; + } + return btreeCompFactories; + } + + @Override + protected int getNumSecondaryKeys() { + return numNestedSecondaryKeyFields; + } + + @Override + protected void setSecondaryRecDescAndComparators() throws AlgebricksException { + List<List<String>> secondaryKeyFields = index.getKeyFieldNames(); + int numSecondaryKeys = secondaryKeyFields.size(); + boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds(); + if (numSecondaryKeys != 1) { + throw new AsterixException("Cannot use " + numSecondaryKeys + " fields as a key for the R-tree index. " + + "There can be only one field as a key for the R-tree index."); + } + Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableOpenFieldType(index.getKeyFieldTypes().get(0), + secondaryKeyFields.get(0), itemType); + IAType spatialType = spatialTypePair.first; + anySecondaryKeyIsNullable = spatialTypePair.second; + if (spatialType == null) { + throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); + } + isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; + int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); + numNestedSecondaryKeyFields = numDimensions * 2; + int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0; + secondaryFieldAccessEvalFactories = + metadataProvider.getFormat().createMBRFactory(isEnforcingKeyTypes ? enforcedItemType : itemType, + secondaryKeyFields.get(0), recordColumn, numDimensions, filterFieldName); + secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; + valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; + ISerializerDeserializer[] secondaryRecFields = + new ISerializerDeserializer[numPrimaryKeys + numNestedSecondaryKeyFields + numFilterFields]; + ISerializerDeserializer[] enforcedRecFields = + new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; + secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; + ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; + IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); + keyType = nestedKeyType.getTypeTag(); + for (int i = 0; i < numNestedSecondaryKeyFields; i++) { + ISerializerDeserializer keySerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType); + secondaryRecFields[i] = keySerde; + secondaryComparatorFactories[i] = + BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true); + secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); + valueProviderFactories[i] = + metadataProvider.getStorageComponentProvider().getPrimitiveValueProviderFactory(); + + } + // Add serializers and comparators for primary index fields. + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i]; + secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i]; + enforcedRecFields[i] = primaryRecDesc.getFields()[i]; + enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i]; + } + } else { + for (int i = 0; i < numPrimaryKeys; i++) { + secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i); + secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i); + enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i); + enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i); + } + } + enforcedRecFields[numPrimaryKeys] = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); + enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); + if (numFilterFields > 0) { + rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; + for (int i = 0; i < rtreeFields.length; i++) { + rtreeFields[i] = i; + } + + Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); + IAType type = typePair.first; + ISerializerDeserializer serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type); + secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde; + } + secondaryRecDesc = new RecordDescriptor(secondaryRecFields); + primaryKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < primaryKeyFields.length; i++) { + primaryKeyFields[i] = i + numNestedSecondaryKeyFields; + } + if (isPointMBR) { + int numNestedSecondaryKeyFieldForPointMBR = numNestedSecondaryKeyFields / 2; + ISerializerDeserializer[] recFieldsForPointMBR = new ISerializerDeserializer[numPrimaryKeys + + numNestedSecondaryKeyFieldForPointMBR + numFilterFields]; + int idx = 0; + for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) { + recFieldsForPointMBR[idx++] = secondaryRecFields[i]; + } + for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) { + recFieldsForPointMBR[idx++] = secondaryRecFields[numNestedSecondaryKeyFields + i]; + } + secondaryRecDescForPointMBR = new RecordDescriptor(recFieldsForPointMBR); + } + } + + @Override + public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { + /*************************************************** + * [ About PointMBR Optimization ] + * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree leaf node, + * PointMBR concept is introduced. + * PointMBR is a way to store a point as 2 doubles in RTree leaf node. + * This reduces RTree index size roughly in half. + * In order to fully benefit from the PointMBR concept, besides RTree, + * external sort operator during bulk-loading (from either data loading or index creation) + * must deal with point as 2 doubles instead of 4 doubles. Otherwise, external sort will suffer from twice as + * many doubles as it actually requires. For this purpose, + * PointMBR specific optimization logic is added as follows: + * 1) CreateMBR function in assign operator generates 2 doubles, instead of 4 doubles. + * 2) External sort operator sorts points represented with 2 doubles. + * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice and then, + * do the same work as non-point MBR cases. + ***************************************************/ + JobSpecification spec = RuntimeUtils.createJobSpecification(); + int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields); + int numNestedSecondaryKeFieldsConsideringPointMBR = + isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields; + RecordDescriptor secondaryRecDescConsideringPointMBR = + isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc; + boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds(); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( + metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties); + if (dataset.getDatasetType() == DatasetType.INTERNAL) { + // Create dummy key provider for feeding the primary index scan. + AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); + + // Create primary index scan op. + BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); + + // Assign op. + AbstractOperatorDescriptor sourceOp = primaryScanOp; + if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { + sourceOp = createCastOp(spec, dataset.getDatasetType()); + spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); + } + AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, + numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR); + + // If any of the secondary fields are nullable, then add a select op that filters nulls. + AlgebricksMetaOperatorDescriptor selectOp = null; + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR, + secondaryRecDescConsideringPointMBR); + } + + // Sort by secondary keys. + ExternalSortOperatorDescriptor sortOp = createSortOp(spec, + new IBinaryComparatorFactory[] { + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) }, + isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc); + // Create secondary RTree bulk load op. + TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, + indexDataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR); + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); + // Connect the operators. + spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); + } else { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); + } + spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); + spec.addRoot(metaOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + } else { + // External dataset + /* + * In case of external data, this method is used to build loading jobs for both + * initial load on index creation + * and transaction load on dataset referesh + */ + // Create external indexing scan operator + ExternalScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec); + AbstractOperatorDescriptor sourceOp = primaryScanOp; + if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { + sourceOp = createCastOp(spec, dataset.getDatasetType()); + spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); + } + // Assign op. + AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, + numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR); + + // If any of the secondary fields are nullable, then add a select op that filters nulls. + AlgebricksMetaOperatorDescriptor selectOp = null; + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR, + secondaryRecDescConsideringPointMBR); + } + + // Sort by secondary keys. + ExternalSortOperatorDescriptor sortOp = createSortOp(spec, + new IBinaryComparatorFactory[] { + MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length) }, + isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc); + // Create secondary RTree bulk load op. + IOperatorDescriptor root; + AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp; + if (externalFiles != null) { + // Transaction load + secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation, + indexDataflowHelperFactory, GlobalConfig.DEFAULT_TREE_FILL_FACTOR); + root = secondaryBulkLoadOp; + } else { + // Initial load + secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, indexDataflowHelperFactory, + GlobalConfig.DEFAULT_TREE_FILL_FACTOR); + AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, + new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, + new RecordDescriptor[] { secondaryRecDesc }); + spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); + root = metaOp; + } + + spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); + if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); + spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); + } else { + spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); + } + spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); + spec.addRoot(root); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + } + return spec; + } + + protected int[] createFieldPermutationForBulkLoadOp(int numSecondaryKeyFields) { + int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; + int numSecondaryKeyFieldsForPointMBR = numSecondaryKeyFields / 2; + int end = isPointMBR ? numSecondaryKeyFieldsForPointMBR : fieldPermutation.length; + for (int i = 0; i < end; i++) { + fieldPermutation[i] = i; + } + if (isPointMBR) { + /******************************************************************************* + * For example, suppose that 2d point type data is indexed using RTree, there is no + * filter fields, and a primary key consists of a single field. + * ========== Without PointMBR optimization ========== + * If there is no point type optimization, the input operator of RTree's TreeIndexBulkLoadOperator + * delivers five variables to the TreeIndexBulkLoadOperator as follows: + * [$var1, $var2, $var3, $var4, $var5] + * where $var1 ~ $var4 together represent an MBR of a point object. + * Since it is a point object, $var1 and $var3 have always identical values. So do $var2 and $var3. + * $var5 represents a primary key value. + * fieldPermutation variable captures this order correctly by putting values in the array as follows: + * [0,1,2,3,4] + * =========== With PointMBR optimization =========== + * With PointMBR optimization, the input operator of RTree's TreeIndexBulkLoadOperator + * delivers 3 variables to the TreeIndexBulkLoadOperator as follows: + * [$var1, $var2, $var3] + * where $var1 and $var2 together represent an MBR of a point object. + * $var3 represents a primary key value. + * fieldPermutation variable captures this order correctly by putting values in the array as follows: + * [0,1,0,1,2] + * This means that bulkloadOp reads the pair of $var1 and $var2 twice in order to provide the same + * output just like when there were no PointMBR optimization available. + * This adjustment is done in this if clause code. + *********************************************************************************/ + int idx = numSecondaryKeyFieldsForPointMBR; + //add the rest of the sk fields for pointMBR + for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) { + fieldPermutation[idx++] = i; + } + //add the pk and filter fields + end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + numFilterFields; + for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) { + fieldPermutation[idx++] = i; + } + } + return fieldPermutation; + } + + @Override + public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { + JobSpecification spec = RuntimeUtils.createJobSpecification(); + IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory( + metadataProvider, index, itemType, metaType, mergePolicyFactory, mergePolicyFactoryProperties); + LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, + metadataProvider.getStorageComponentProvider().getStorageManager(), + metadataProvider.getStorageComponentProvider().getIndexLifecycleManagerProvider(), + secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, + secondaryBloomFilterKeyFields, indexDataflowHelperFactory, + dataset.getModificationCallbackFactory(metadataProvider.getStorageComponentProvider(), index, null, + IndexOperation.FULL_MERGE, null), + metadataProvider.getStorageComponentProvider().getMetadataPageManagerFactory()); + + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, + secondaryPartitionConstraint); + spec.addRoot(compactOp); + spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); + return spec; + } +}
