http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java deleted file mode 100644 index c362e2e..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java +++ /dev/null @@ -1,585 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.file; - -import java.io.DataOutput; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.app.external.ExternalIndexingOperations; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.context.ITransactionSubsystemProvider; -import org.apache.asterix.common.context.TransactionSubsystemProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalIndexBulkModifyOperatorDescriptor; -import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; -import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.InternalDatasetDetails; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.evaluators.functions.AndDescriptor; -import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor; -import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor; -import org.apache.asterix.runtime.evaluators.functions.NotDescriptor; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.IJobletEventListenerFactory; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor; -import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; - -@SuppressWarnings("rawtypes") -// TODO: We should eventually have a hierarchy of classes that can create all -// possible index job specs, -// not just for creation. -public abstract class SecondaryIndexOperationsHelper { - protected final PhysicalOptimizationConfig physOptConf; - - protected int numPrimaryKeys; - protected int numSecondaryKeys; - protected MetadataProvider metadataProvider; - protected String dataverseName; - protected String datasetName; - protected Dataset dataset; - protected ARecordType itemType; - protected ARecordType metaType; - protected List<Integer> keySourceIndicators; - protected ISerializerDeserializer metaSerde; - protected ISerializerDeserializer payloadSerde; - protected IFileSplitProvider primaryFileSplitProvider; - protected AlgebricksPartitionConstraint primaryPartitionConstraint; - protected IFileSplitProvider secondaryFileSplitProvider; - protected AlgebricksPartitionConstraint secondaryPartitionConstraint; - protected String secondaryIndexName; - protected boolean anySecondaryKeyIsNullable = false; - protected boolean isEnforcingKeyTypes = false; - - protected long numElementsHint; - protected IBinaryComparatorFactory[] primaryComparatorFactories; - protected int[] primaryBloomFilterKeyFields; - protected RecordDescriptor primaryRecDesc; - protected IBinaryComparatorFactory[] secondaryComparatorFactories; - protected ITypeTraits[] secondaryTypeTraits; - protected int[] secondaryBloomFilterKeyFields; - protected RecordDescriptor secondaryRecDesc; - protected IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories; - - protected IPropertiesProvider propertiesProvider; - protected ILSMMergePolicyFactory mergePolicyFactory; - protected Map<String, String> mergePolicyFactoryProperties; - protected RecordDescriptor enforcedRecDesc; - protected ARecordType enforcedItemType; - protected ARecordType enforcedMetaType; - protected int numFilterFields; - protected List<String> filterFieldName; - protected ITypeTraits[] filterTypeTraits; - protected IBinaryComparatorFactory[] filterCmpFactories; - protected int[] secondaryFilterFields; - protected int[] primaryFilterFields; - protected int[] primaryBTreeFields; - protected int[] secondaryBTreeFields; - protected List<ExternalFile> externalFiles; - - // Prevent public construction. Should be created via createIndexCreator(). - protected SecondaryIndexOperationsHelper(PhysicalOptimizationConfig physOptConf, - IPropertiesProvider propertiesProvider) { - this.physOptConf = physOptConf; - this.propertiesProvider = propertiesProvider; - } - - public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName, - String datasetName, String indexName, List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes, - boolean isEnforced, int gramLength, MetadataProvider metadataProvider, - PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType metaType, - List<Integer> keySourceIndicators, ARecordType enforcedType) throws AsterixException, AlgebricksException { - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - SecondaryIndexOperationsHelper indexOperationsHelper = null; - switch (indexType) { - case BTREE: { - indexOperationsHelper = new SecondaryBTreeOperationsHelper(physOptConf, asterixPropertiesProvider); - break; - } - case RTREE: { - indexOperationsHelper = new SecondaryRTreeOperationsHelper(physOptConf, asterixPropertiesProvider); - break; - } - case SINGLE_PARTITION_WORD_INVIX: - case SINGLE_PARTITION_NGRAM_INVIX: - case LENGTH_PARTITIONED_WORD_INVIX: - case LENGTH_PARTITIONED_NGRAM_INVIX: { - indexOperationsHelper = - new SecondaryInvertedIndexOperationsHelper(physOptConf, asterixPropertiesProvider); - break; - } - default: { - throw new AsterixException("Unknown Index Type: " + indexType); - } - } - indexOperationsHelper.init(indexType, dataverseName, datasetName, indexName, secondaryKeyFields, - secondaryKeyTypes, isEnforced, gramLength, metadataProvider, recType, metaType, keySourceIndicators, - enforcedType); - return indexOperationsHelper; - } - - public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException; - - public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException; - - public abstract JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException; - - protected void init(IndexType indexType, String dvn, String dsn, String in, List<List<String>> secondaryKeyFields, - List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, MetadataProvider metadataProvider, - ARecordType aRecType, ARecordType metaType, List<Integer> keySourceIndicators, ARecordType enforcedType) - throws AsterixException, AlgebricksException { - this.metadataProvider = metadataProvider; - dataverseName = dvn == null ? metadataProvider.getDefaultDataverseName() : dvn; - datasetName = dsn; - secondaryIndexName = in; - isEnforcingKeyTypes = isEnforced; - dataset = metadataProvider.findDataset(dataverseName, datasetName); - if (dataset == null) { - throw new AsterixException("Unknown dataset " + datasetName); - } - boolean temp = dataset.getDatasetDetails().isTemp(); - itemType = aRecType; - this.metaType = metaType; - this.keySourceIndicators = keySourceIndicators; - enforcedItemType = enforcedType; - payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); - metaSerde = metaType == null ? null - : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); - numSecondaryKeys = secondaryKeyFields.size(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp); - secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - secondaryPartitionConstraint = secondarySplitsAndConstraint.second; - - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - numPrimaryKeys = ExternalIndexingOperations.getRIDSize(dataset); - } else { - filterFieldName = DatasetUtils.getFilterField(dataset); - if (filterFieldName != null) { - numFilterFields = 1; - } else { - numFilterFields = 0; - } - - numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp); - primaryFileSplitProvider = primarySplitsAndConstraint.first; - primaryPartitionConstraint = primarySplitsAndConstraint.second; - setPrimaryRecDescAndComparators(); - } - setSecondaryRecDescAndComparators(indexType, secondaryKeyFields, secondaryKeyTypes, gramLength, - metadataProvider); - numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = - DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext()); - mergePolicyFactory = compactionInfo.first; - mergePolicyFactoryProperties = compactionInfo.second; - - if (numFilterFields > 0) { - setFilterTypeTraitsAndComparators(); - } - } - - protected void setFilterTypeTraitsAndComparators() throws AlgebricksException { - filterTypeTraits = new ITypeTraits[numFilterFields]; - filterCmpFactories = new IBinaryComparatorFactory[numFilterFields]; - secondaryFilterFields = new int[numFilterFields]; - primaryFilterFields = new int[numFilterFields]; - primaryBTreeFields = new int[numPrimaryKeys + 1]; - secondaryBTreeFields = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < primaryBTreeFields.length; i++) { - primaryBTreeFields[i] = i; - } - for (int i = 0; i < secondaryBTreeFields.length; i++) { - secondaryBTreeFields[i] = i; - } - - IAType type = itemType.getSubFieldType(filterFieldName); - filterCmpFactories[0] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(type, true); - filterTypeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type); - secondaryFilterFields[0] = getNumSecondaryKeys() + numPrimaryKeys; - primaryFilterFields[0] = numPrimaryKeys + 1; - } - - protected abstract int getNumSecondaryKeys(); - - protected void setPrimaryRecDescAndComparators() throws AlgebricksException { - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); - int numPrimaryKeys = partitioningKeys.size(); - ISerializerDeserializer[] primaryRecFields = - new ISerializerDeserializer[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; - ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (dataset.hasMetaPart() ? 1 : 0)]; - primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys]; - primaryBloomFilterKeyFields = new int[numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider(); - List<Integer> indicators = null; - if (dataset.hasMetaPart()) { - indicators = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); - } - for (int i = 0; i < numPrimaryKeys; i++) { - IAType keyType = - (indicators == null || indicators.get(i) == 0) ? itemType.getSubFieldType(partitioningKeys.get(i)) - : metaType.getSubFieldType(partitioningKeys.get(i)); - primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType); - primaryComparatorFactories[i] = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); - primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); - primaryBloomFilterKeyFields[i] = i; - } - primaryRecFields[numPrimaryKeys] = payloadSerde; - primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); - if (dataset.hasMetaPart()) { - primaryRecFields[numPrimaryKeys + 1] = payloadSerde; - primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); - } - primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits); - } - - protected abstract void setSecondaryRecDescAndComparators(IndexType indexType, - List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes, int gramLength, - MetadataProvider metadataProvider) throws AlgebricksException, AsterixException; - - protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) - throws AsterixException, AlgebricksException { - // Build dummy tuple containing one field with a dummy value inside. - ArrayTupleBuilder tb = new ArrayTupleBuilder(1); - DataOutput dos = tb.getDataOutput(); - tb.reset(); - try { - // Serialize dummy value into a field. - IntegerSerializerDeserializer.INSTANCE.serialize(0, dos); - } catch (HyracksDataException e) { - throw new AsterixException(e); - } - // Add dummy field. - tb.addFieldEndOffset(); - ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE }; - RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers); - ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec, - keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp, - primaryPartitionConstraint); - return keyProviderOp; - } - - protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException { - // -Infinity - int[] lowKeyFields = null; - // +Infinity - int[] highKeyFields = null; - ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider(); - JobId jobId = JobIdFactory.generateJobId(); - metadataProvider.setJobId(jobId); - boolean isWriteTransaction = metadataProvider.isWriteTransaction(); - IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction); - spec.setJobletEventListenerFactory(jobEventListenerFactory); - - boolean temp = dataset.getDatasetDetails().isTemp(); - ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(), - primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories, - primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - mergePolicyFactory, mergePolicyFactoryProperties, - new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories, - primaryBTreeFields, primaryFilterFields, !temp), - false, false, null, searchCallbackFactory, null, null, LSMIndexUtil - .getMetadataPageManagerFactory()); - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp, - primaryPartitionConstraint); - return primarySearchOp; - } - - protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, - AbstractOperatorDescriptor primaryScanOp, int numSecondaryKeyFields, RecordDescriptor secondaryRecDesc) - throws AlgebricksException { - int[] outColumns = new int[numSecondaryKeyFields + numFilterFields]; - int[] projectionList = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; - for (int i = 0; i < numSecondaryKeyFields + numFilterFields; i++) { - outColumns[i] = numPrimaryKeys + i; - } - int projCount = 0; - for (int i = 0; i < numSecondaryKeyFields; i++) { - projectionList[projCount++] = numPrimaryKeys + i; - } - for (int i = 0; i < numPrimaryKeys; i++) { - projectionList[projCount++] = i; - } - if (numFilterFields > 0) { - projectionList[projCount++] = numPrimaryKeys + numSecondaryKeyFields; - } - - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; - for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { - sefs[i] = secondaryFieldAccessEvalFactories[i]; - } - AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc }); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp, - primaryPartitionConstraint); - return asterixAssignOp; - } - - protected AlgebricksMetaOperatorDescriptor createCastOp(JobSpecification spec, DatasetType dsType) { - CastTypeDescriptor castFuncDesc = (CastTypeDescriptor) CastTypeDescriptor.FACTORY.createFunctionDescriptor(); - castFuncDesc.setImmutableStates(enforcedItemType, itemType); - - int[] outColumns = new int[1]; - int[] projectionList = new int[(dataset.hasMetaPart() ? 2 : 1) + numPrimaryKeys]; - int recordIdx; - //external datascan operator returns a record as the first field, instead of the last in internal case - if (dsType == DatasetType.EXTERNAL) { - recordIdx = 0; - outColumns[0] = 0; - } else { - recordIdx = numPrimaryKeys; - outColumns[0] = numPrimaryKeys; - } - for (int i = 0; i <= numPrimaryKeys; i++) { - projectionList[i] = i; - } - if (dataset.hasMetaPart()) { - projectionList[numPrimaryKeys + 1] = numPrimaryKeys + 1; - } - IScalarEvaluatorFactory[] castEvalFact = - new IScalarEvaluatorFactory[] { new ColumnAccessEvalFactory(recordIdx) }; - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[1]; - sefs[0] = castFuncDesc.createEvaluatorFactory(castEvalFact); - AssignRuntimeFactory castAssign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor castRecAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { castAssign }, new RecordDescriptor[] { enforcedRecDesc }); - - return castRecAssignOp; - } - - protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, - IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { - int[] sortFields = new int[secondaryComparatorFactories.length]; - for (int i = 0; i < secondaryComparatorFactories.length; i++) { - sortFields[i] = i; - } - ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, - physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); - return sortOp; - } - - protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec, - int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) - throws MetadataException, AlgebricksException { - TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec, - secondaryRecDesc, RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields, - fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory, LSMIndexUtil - .getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, - secondaryPartitionConstraint); - return treeIndexBulkLoadOp; - } - - public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields, - RecordDescriptor secondaryRecDesc) throws AlgebricksException { - IScalarEvaluatorFactory[] andArgsEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeyFields]; - NotDescriptor notDesc = new NotDescriptor(); - IsUnknownDescriptor isUnknownDesc = new IsUnknownDescriptor(); - for (int i = 0; i < numSecondaryKeyFields; i++) { - // Access column i, and apply 'is not null'. - ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i); - IScalarEvaluatorFactory isUnknownEvalFactory = - isUnknownDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { columnAccessEvalFactory }); - IScalarEvaluatorFactory notEvalFactory = - notDesc.createEvaluatorFactory(new IScalarEvaluatorFactory[] { isUnknownEvalFactory }); - andArgsEvalFactories[i] = notEvalFactory; - } - IScalarEvaluatorFactory selectCond = null; - if (numSecondaryKeyFields > 1) { - // Create conjunctive condition where all secondary index keys must - // satisfy 'is not null'. - AndDescriptor andDesc = new AndDescriptor(); - selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories); - } else { - selectCond = andArgsEvalFactories[0]; - } - StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(selectCond, null, - BinaryBooleanInspector.FACTORY, false, -1, null); - AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc }); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp, - primaryPartitionConstraint); - return asterixSelectOp; - } - - // This method creates a source indexing operator for external data - protected ExternalDataScanOperatorDescriptor createExternalIndexingOp(JobSpecification spec) - throws AlgebricksException, AsterixException { - // A record + primary keys - ISerializerDeserializer[] serdes = new ISerializerDeserializer[1 + numPrimaryKeys]; - ITypeTraits[] typeTraits = new ITypeTraits[1 + numPrimaryKeys]; - // payload serde and type traits for the record slot - serdes[0] = payloadSerde; - typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType); - // serdes and type traits for rid fields - for (int i = 1; i < serdes.length; i++) { - serdes[i] = IndexingConstants.getSerializerDeserializer(i - 1); - typeTraits[i] = IndexingConstants.getTypeTraits(i - 1); - } - // output record desc - RecordDescriptor indexerDesc = new RecordDescriptor(serdes, typeTraits); - - // Create the operator and its partition constraits - Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> indexingOpAndConstraints; - try { - indexingOpAndConstraints = ExternalIndexingOperations.createExternalIndexingOp(spec, metadataProvider, - dataset, itemType, indexerDesc, externalFiles); - } catch (Exception e) { - throw new AlgebricksException(e); - } - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first, - indexingOpAndConstraints.second); - - // Set the primary partition constraints to this partition constraints - primaryPartitionConstraint = indexingOpAndConstraints.second; - return indexingOpAndConstraints.first; - } - - protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec, int numSecondaryKeys, - RecordDescriptor secondaryRecDesc) throws AlgebricksException { - int[] outColumns = new int[numSecondaryKeys]; - int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < numSecondaryKeys; i++) { - outColumns[i] = i + numPrimaryKeys + 1; - projectionList[i] = i + numPrimaryKeys + 1; - } - - IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length]; - for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) { - sefs[i] = secondaryFieldAccessEvalFactories[i]; - } - //add External RIDs to the projection list - for (int i = 0; i < numPrimaryKeys; i++) { - projectionList[numSecondaryKeys + i] = i + 1; - } - - AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList); - AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1, - new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc }); - return asterixAssignOp; - } - - protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec, - int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) - throws MetadataException, AlgebricksException { - // create a list of file ids - int numOfDeletedFiles = 0; - for (ExternalFile file : externalFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { - numOfDeletedFiles++; - } - } - int[] deletedFiles = new int[numOfDeletedFiles]; - int i = 0; - for (ExternalFile file : externalFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { - deletedFiles[i] = file.getFileNumber(); - } - } - ExternalIndexBulkModifyOperatorDescriptor treeIndexBulkLoadOp = new ExternalIndexBulkModifyOperatorDescriptor( - spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, secondaryTypeTraits, - secondaryComparatorFactories, secondaryBloomFilterKeyFields, dataflowHelperFactory, - NoOpOperationCallbackFactory.INSTANCE, deletedFiles, fieldPermutation, fillFactor, numElementsHint, - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp, - secondaryPartitionConstraint); - return treeIndexBulkLoadOp; - } - - public List<ExternalFile> getExternalFiles() { - return externalFiles; - } - - public void setExternalFiles(List<ExternalFile> externalFiles) { - this.externalFiles = externalFiles; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java deleted file mode 100644 index 3f5a9e9..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import java.util.List; - -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; -import org.apache.asterix.common.transactions.IResourceFactory; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.asterix.runtime.formats.FormatUtils; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; -import org.apache.hyracks.algebricks.data.ITypeTraitProvider; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; -import org.apache.hyracks.data.std.primitive.ShortPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCompactOperator; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; - -public class SecondaryInvertedIndexOperationsHelper extends SecondaryIndexOperationsHelper { - - private IAType secondaryKeyType; - private ITypeTraits[] invListsTypeTraits; - private IBinaryComparatorFactory[] tokenComparatorFactories; - private ITypeTraits[] tokenTypeTraits; - private IBinaryTokenizerFactory tokenizerFactory; - // For tokenization, sorting and loading. Represents <token, primary keys>. - private int numTokenKeyPairFields; - private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories; - private RecordDescriptor tokenKeyPairRecDesc; - private boolean isPartitioned; - private int[] invertedIndexFields; - private int[] invertedIndexFieldsForNonBulkLoadOps; - private int[] secondaryFilterFieldsForNonBulkLoadOps; - - protected SecondaryInvertedIndexOperationsHelper(PhysicalOptimizationConfig physOptConf, - IPropertiesProvider propertiesProvider) { - super(physOptConf, propertiesProvider); - } - - @Override - @SuppressWarnings("rawtypes") - protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields, - List<IAType> secondaryKeyTypes, int gramLength, MetadataProvider metadata) - throws AlgebricksException, AsterixException { - // Sanity checks. - if (numPrimaryKeys > 1) { - throw new AsterixException("Cannot create inverted index on dataset with composite primary key."); - } - if (numSecondaryKeys > 1) { - throw new AsterixException("Cannot create composite inverted index on multiple fields."); - } - if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX - || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - isPartitioned = true; - } else { - isPartitioned = false; - } - // Prepare record descriptor used in the assign op, and the optional - // select op. - secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; - ISerializerDeserializer[] secondaryRecFields = - new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys + numFilterFields]; - ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; - secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys]; - ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; - ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider(); - ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider(); - if (numSecondaryKeys > 0) { - secondaryFieldAccessEvalFactories[0] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory( - isEnforcingKeyTypes ? enforcedItemType : itemType, secondaryKeyFields.get(0), numPrimaryKeys); - Pair<IAType, Boolean> keyTypePair = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), itemType); - secondaryKeyType = keyTypePair.first; - anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second; - ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType); - secondaryRecFields[0] = keySerde; - secondaryTypeTraits[0] = typeTraitProvider.getTypeTrait(secondaryKeyType); - } - if (numFilterFields > 0) { - secondaryFieldAccessEvalFactories[numSecondaryKeys] = FormatUtils.getDefaultFormat() - .getFieldAccessEvaluatorFactory(itemType, filterFieldName, numPrimaryKeys); - Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); - IAType type = keyTypePair.first; - ISerializerDeserializer serde = serdeProvider.getSerializerDeserializer(type); - secondaryRecFields[numPrimaryKeys + numSecondaryKeys] = serde; - } - secondaryRecDesc = new RecordDescriptor(secondaryRecFields); - // Comparators and type traits for tokens. - int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1; - tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields]; - tokenTypeTraits = new ITypeTraits[numTokenFields]; - tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); - tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); - if (isPartitioned) { - // The partitioning field is hardcoded to be a short *without* an Asterix type tag. - tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); - tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; - } - // Set tokenizer factory. - // TODO: We might want to expose the hashing option at the AQL level, - // and add the choice to the index metadata. - tokenizerFactory = - NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType, gramLength); - // Type traits for inverted-list elements. Inverted lists contain - // primary keys. - invListsTypeTraits = new ITypeTraits[numPrimaryKeys]; - if (numPrimaryKeys > 0) { - invListsTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; - enforcedRecFields[0] = primaryRecDesc.getFields()[0]; - enforcedTypeTraits[0] = primaryRecDesc.getTypeTraits()[0]; - } - enforcedRecFields[numPrimaryKeys] = serdeProvider.getSerializerDeserializer(itemType); - enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); - // For tokenization, sorting and loading. - // One token (+ optional partitioning field) + primary keys. - numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys; - ISerializerDeserializer[] tokenKeyPairFields = - new ISerializerDeserializer[numTokenKeyPairFields + numFilterFields]; - ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields]; - tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields]; - tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType); - tokenKeyPairTypeTraits[0] = tokenTypeTraits[0]; - tokenKeyPairComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType); - int pkOff = 1; - if (isPartitioned) { - tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE; - tokenKeyPairTypeTraits[1] = tokenTypeTraits[1]; - tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY); - pkOff = 2; - } - if (numPrimaryKeys > 0) { - tokenKeyPairFields[pkOff] = primaryRecDesc.getFields()[0]; - tokenKeyPairTypeTraits[pkOff] = primaryRecDesc.getTypeTraits()[0]; - tokenKeyPairComparatorFactories[pkOff] = primaryComparatorFactories[0]; - } - if (numFilterFields > 0) { - tokenKeyPairFields[numPrimaryKeys + pkOff] = secondaryRecFields[numPrimaryKeys + numSecondaryKeys]; - } - tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits); - if (filterFieldName != null) { - invertedIndexFields = new int[numTokenKeyPairFields]; - for (int i = 0; i < invertedIndexFields.length; i++) { - invertedIndexFields[i] = i; - } - secondaryFilterFieldsForNonBulkLoadOps = new int[numFilterFields]; - secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys; - invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys]; - for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) { - invertedIndexFieldsForNonBulkLoadOps[i] = i; - } - } - - } - - @Override - protected int getNumSecondaryKeys() { - return numTokenKeyPairFields - numPrimaryKeys; - } - - @Override - public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - IResourceFactory localResourceMetadata = new LSMInvertedIndexLocalResourceMetadataFactory(invListsTypeTraits, - primaryComparatorFactories, tokenTypeTraits, tokenComparatorFactories, tokenizerFactory, isPartitioned, - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, - filterCmpFactories, invertedIndexFields, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps); - ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.LSMInvertedIndexResource); - - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = - new LSMInvertedIndexCreateOperatorDescriptor(spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, - tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, - dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(invIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - @Override - public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); - - // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); - - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { - sourceOp = createCastOp(spec, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = - createAssignOp(spec, sourceOp, numSecondaryKeys, secondaryRecDesc); - - // If any of the secondary fields are nullable, then add a select op - // that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys, secondaryRecDesc); - } - - // Create a tokenizer op. - AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec); - - // Sort by token + primary keys. - ExternalSortOperatorDescriptor sortOp = - createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc); - - // Create secondary inverted index bulk load op. - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec); - - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); - // Connect the operators. - spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, metaOp, 0); - spec.addRoot(metaOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException { - int docField = 0; - int[] primaryKeyFields = new int[numPrimaryKeys + numFilterFields]; - for (int i = 0; i < primaryKeyFields.length; i++) { - primaryKeyFields[i] = numSecondaryKeys + i; - } - BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, - tokenizerFactory, docField, primaryKeyFields, isPartitioned, false); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp, - primaryPartitionConstraint); - return tokenizerOp; - } - - @Override - protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec, - IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) { - // Sort on token and primary keys. - int[] sortFields = new int[numTokenKeyPairFields]; - for (int i = 0; i < numTokenKeyPairFields; i++) { - sortFields[i] = i; - } - ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec, - physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); - return sortOp; - } - - private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) { - int[] fieldPermutation = new int[numTokenKeyPairFields + numFilterFields]; - for (int i = 0; i < fieldPermutation.length; i++) { - fieldPermutation[i] = i; - } - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor( - spec, secondaryRecDesc, fieldPermutation, false, numElementsHint, false, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, tokenComparatorFactories, - invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory, - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp, - secondaryPartitionConstraint); - return invIndexBulkLoadOp; - } - - private IIndexDataflowHelperFactory createDataflowHelperFactory() { - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - if (!isPartitioned) { - return new LSMInvertedIndexDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits, - filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps, !temp); - } else { - return new PartitionedLSMInvertedIndexDataflowHelperFactory( - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits, - filterCmpFactories, secondaryFilterFields, secondaryFilterFieldsForNonBulkLoadOps, - invertedIndexFieldsForNonBulkLoadOps, !temp); - } - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory(); - LSMInvertedIndexCompactOperator compactOp = - new LSMInvertedIndexCompactOperator(spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, RuntimeComponentsProvider.RUNTIME_PROVIDER, tokenTypeTraits, - tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, - dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil - .getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } -}
