http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java deleted file mode 100644 index c96ba4c..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.file; - -import java.util.List; - -import org.apache.asterix.app.external.ExternalIndexingOperations; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; -import org.apache.asterix.common.transactions.IResourceFactory; -import org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; -import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; -import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; -import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; -import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeWithAntiMatterTuplesDataflowHelperFactory; -import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; -import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider; -import org.apache.hyracks.storage.common.file.LocalResource; - -@SuppressWarnings("rawtypes") -public class SecondaryRTreeOperationsHelper extends SecondaryIndexOperationsHelper { - - protected IPrimitiveValueProviderFactory[] valueProviderFactories; - protected int numNestedSecondaryKeyFields; - protected ATypeTag keyType; - protected int[] primaryKeyFields; - protected int[] rtreeFields; - protected boolean isPointMBR; - protected RecordDescriptor secondaryRecDescForPointMBR = null; - - protected SecondaryRTreeOperationsHelper(PhysicalOptimizationConfig physOptConf, - IPropertiesProvider propertiesProvider) { - super(physOptConf, propertiesProvider); - } - - @Override - public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - IIndexDataflowHelperFactory indexDataflowHelperFactory; - ILocalResourceFactoryProvider localResourceFactoryProvider; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - - IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree(); - - //prepare a LocalResourceMetadata which will be stored in NC's local resource repository - IResourceFactory localResourceMetadata = new LSMRTreeLocalResourceMetadataFactory(secondaryTypeTraits, - secondaryComparatorFactories, btreeCompFactories, valueProviderFactories, RTreePolicyType.RTREE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, filterTypeTraits, - filterCmpFactories, rtreeFields, primaryKeyFields, secondaryFilterFields, isPointMBR); - localResourceFactoryProvider = - new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMRTreeResource); - indexDataflowHelperFactory = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(valueProviderFactories, - RTreePolicyType.RTREE, btreeCompFactories, - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), rtreeFields, - filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp, isPointMBR); - } else { - // External dataset - // Prepare a LocalResourceMetadata which will be stored in NC's local resource repository - IResourceFactory localResourceMetadata = new ExternalRTreeLocalResourceMetadataFactory(secondaryTypeTraits, - secondaryComparatorFactories, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), - valueProviderFactories, RTreePolicyType.RTREE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties, primaryKeyFields, - isPointMBR); - localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata, - LocalResource.ExternalRTreeResource); - - indexDataflowHelperFactory = new ExternalRTreeDataflowHelperFactory(valueProviderFactories, - RTreePolicyType.RTREE, ExternalIndexingOperations.getBuddyBtreeComparatorFactories(), - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numNestedSecondaryKeyFields }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR); - } - - TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, secondaryTypeTraits, secondaryComparatorFactories, null, - indexDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE, - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, - secondaryPartitionConstraint); - spec.addRoot(secondaryIndexCreateOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - private IBinaryComparatorFactory[] getComparatorFactoriesForDeletedKeyBTree() { - IBinaryComparatorFactory[] btreeCompFactories = new IBinaryComparatorFactory[secondaryTypeTraits.length];; - int i = 0; - for (; i < secondaryComparatorFactories.length; i++) { - btreeCompFactories[i] = secondaryComparatorFactories[i]; - } - for (int j = 0; i < secondaryTypeTraits.length; i++, j++) { - btreeCompFactories[i] = primaryComparatorFactories[j]; - } - return btreeCompFactories; - } - - @Override - protected int getNumSecondaryKeys() { - return numNestedSecondaryKeyFields; - } - - @Override - protected void setSecondaryRecDescAndComparators(IndexType indexType, List<List<String>> secondaryKeyFields, - List<IAType> secondaryKeyTypes, int gramLength, MetadataProvider metadata) - throws AlgebricksException, AsterixException { - int numSecondaryKeys = secondaryKeyFields.size(); - if (numSecondaryKeys != 1) { - throw new AsterixException("Cannot use " + numSecondaryKeys - + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index."); - } - Pair<IAType, Boolean> spatialTypePair = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), itemType); - IAType spatialType = spatialTypePair.first; - anySecondaryKeyIsNullable = spatialTypePair.second; - if (spatialType == null) { - throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); - } - isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; - int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); - numNestedSecondaryKeyFields = numDimensions * 2; - int recordColumn = dataset.getDatasetType() == DatasetType.INTERNAL ? numPrimaryKeys : 0; - secondaryFieldAccessEvalFactories = - metadata.getFormat().createMBRFactory(isEnforcingKeyTypes ? enforcedItemType : itemType, - secondaryKeyFields.get(0), recordColumn, numDimensions, filterFieldName); - secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; - valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; - ISerializerDeserializer[] secondaryRecFields = - new ISerializerDeserializer[numPrimaryKeys + numNestedSecondaryKeyFields + numFilterFields]; - ISerializerDeserializer[] enforcedRecFields = new ISerializerDeserializer[1 + numPrimaryKeys + numFilterFields]; - secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; - ITypeTraits[] enforcedTypeTraits = new ITypeTraits[1 + numPrimaryKeys]; - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); - keyType = nestedKeyType.getTypeTag(); - for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - ISerializerDeserializer keySerde = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(nestedKeyType); - secondaryRecFields[i] = keySerde; - secondaryComparatorFactories[i] = - BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(nestedKeyType, true); - secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); - valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE; - - } - // Add serializers and comparators for primary index fields. - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i]; - secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i]; - enforcedRecFields[i] = primaryRecDesc.getFields()[i]; - enforcedTypeTraits[i] = primaryRecDesc.getTypeTraits()[i]; - } - } else { - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i); - secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i); - enforcedRecFields[i] = IndexingConstants.getSerializerDeserializer(i); - enforcedTypeTraits[i] = IndexingConstants.getTypeTraits(i); - } - } - enforcedRecFields[numPrimaryKeys] = - SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); - enforcedRecDesc = new RecordDescriptor(enforcedRecFields, enforcedTypeTraits); - if (numFilterFields > 0) { - rtreeFields = new int[numNestedSecondaryKeyFields + numPrimaryKeys]; - for (int i = 0; i < rtreeFields.length; i++) { - rtreeFields[i] = i; - } - - Pair<IAType, Boolean> typePair = Index.getNonNullableKeyFieldType(filterFieldName, itemType); - IAType type = typePair.first; - ISerializerDeserializer serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type); - secondaryRecFields[numPrimaryKeys + numNestedSecondaryKeyFields] = serde; - } - secondaryRecDesc = new RecordDescriptor(secondaryRecFields); - primaryKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < primaryKeyFields.length; i++) { - primaryKeyFields[i] = i + numNestedSecondaryKeyFields; - } - if (isPointMBR) { - int numNestedSecondaryKeyFieldForPointMBR = numNestedSecondaryKeyFields / 2; - ISerializerDeserializer[] recFieldsForPointMBR = new ISerializerDeserializer[numPrimaryKeys - + numNestedSecondaryKeyFieldForPointMBR + numFilterFields]; - int idx = 0; - for (int i = 0; i < numNestedSecondaryKeyFieldForPointMBR; i++) { - recFieldsForPointMBR[idx++] = secondaryRecFields[i]; - } - for (int i = 0; i < numPrimaryKeys + numFilterFields; i++) { - recFieldsForPointMBR[idx++] = secondaryRecFields[numNestedSecondaryKeyFields + i]; - } - secondaryRecDescForPointMBR = new RecordDescriptor(recFieldsForPointMBR); - } - } - - @Override - public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { - /*************************************************** - * [ About PointMBR Optimization ] - * Instead of storing a MBR(4 doubles) for a point(2 doubles) in RTree leaf node, PointMBR concept is introduced. - * PointMBR is a way to store a point as 2 doubles in RTree leaf node. - * This reduces RTree index size roughly in half. - * In order to fully benefit from the PointMBR concept, besides RTree, - * external sort operator during bulk-loading (from either data loading or index creation) - * must deal with point as 2 doubles instead of 4 doubles. Otherwise, external sort will suffer from twice as - * many doubles as it actually requires. For this purpose, PointMBR specific optimization logic is added as follows: - * 1) CreateMBR function in assign operator generates 2 doubles, instead of 4 doubles. - * 2) External sort operator sorts points represented with 2 doubles. - * 3) Bulk-loading in RTree takes 4 doubles by reading 2 doubles twice and then, do the same work as non-point MBR cases. - ***************************************************/ - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - boolean temp = dataset.getDatasetDetails().isTemp(); - int[] fieldPermutation = createFieldPermutationForBulkLoadOp(numNestedSecondaryKeyFields); - int numNestedSecondaryKeFieldsConsideringPointMBR = - isPointMBR ? numNestedSecondaryKeyFields / 2 : numNestedSecondaryKeyFields; - RecordDescriptor secondaryRecDescConsideringPointMBR = - isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - // Create dummy key provider for feeding the primary index scan. - AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); - - // Create primary index scan op. - BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); - - // Assign op. - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { - sourceOp = createCastOp(spec, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, sourceOp, - numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR, - secondaryRecDescConsideringPointMBR); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = - createSortOp(spec, - new IBinaryComparatorFactory[] { MetadataProvider.proposeLinearizer(keyType, - secondaryComparatorFactories.length) }, - isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - - IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree(); - IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory( - valueProviderFactories, RTreePolicyType.RTREE, btreeCompFactories, - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), rtreeFields, - filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp, isPointMBR);; - - // Create secondary RTree bulk load op. - TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = - createTreeIndexBulkLoadOp(spec, fieldPermutation, idff, GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, new RecordDescriptor[] {}); - // Connect the operators. - spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - spec.addRoot(metaOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - } else { - // External dataset - /* - * In case of external data, this method is used to build loading jobs for both initial load on index creation - * and transaction load on dataset referesh - */ - // Create external indexing scan operator - ExternalDataScanOperatorDescriptor primaryScanOp = createExternalIndexingOp(spec); - AbstractOperatorDescriptor sourceOp = primaryScanOp; - if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) { - sourceOp = createCastOp(spec, dataset.getDatasetType()); - spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0); - } - // Assign op. - AlgebricksMetaOperatorDescriptor asterixAssignOp = createExternalAssignOp(spec, - numNestedSecondaryKeFieldsConsideringPointMBR, secondaryRecDescConsideringPointMBR); - - // If any of the secondary fields are nullable, then add a select op that filters nulls. - AlgebricksMetaOperatorDescriptor selectOp = null; - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeFieldsConsideringPointMBR, - secondaryRecDescConsideringPointMBR); - } - - // Sort by secondary keys. - ExternalSortOperatorDescriptor sortOp = - createSortOp(spec, - new IBinaryComparatorFactory[] { MetadataProvider.proposeLinearizer(keyType, - secondaryComparatorFactories.length) }, - isPointMBR ? secondaryRecDescForPointMBR : secondaryRecDesc); - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - - // Create the dataflow helper factory - ExternalRTreeDataflowHelperFactory dataflowHelperFactory = new ExternalRTreeDataflowHelperFactory( - valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories, mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { numNestedSecondaryKeyFields }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR); - // Create secondary RTree bulk load op. - IOperatorDescriptor root; - AbstractTreeIndexOperatorDescriptor secondaryBulkLoadOp; - if (externalFiles != null) { - // Transaction load - secondaryBulkLoadOp = createExternalIndexBulkModifyOp(spec, fieldPermutation, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - root = secondaryBulkLoadOp; - } else { - // Initial load - secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, fieldPermutation, dataflowHelperFactory, - GlobalConfig.DEFAULT_TREE_FILL_FACTOR); - AlgebricksMetaOperatorDescriptor metaOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 0, - new IPushRuntimeFactory[] { new SinkRuntimeFactory() }, - new RecordDescriptor[] { secondaryRecDesc }); - spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoadOp, 0, metaOp, 0); - root = metaOp; - } - - spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, asterixAssignOp, 0); - if (anySecondaryKeyIsNullable || isEnforcingKeyTypes) { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); - spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); - } else { - spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); - } - spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); - spec.addRoot(root); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - } - return spec; - } - - protected int[] createFieldPermutationForBulkLoadOp(int numSecondaryKeyFields) { - int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields]; - int numSecondaryKeyFieldsForPointMBR = numSecondaryKeyFields / 2; - int end = isPointMBR ? numSecondaryKeyFieldsForPointMBR : fieldPermutation.length; - for (int i = 0; i < end; i++) { - fieldPermutation[i] = i; - } - if (isPointMBR) { - /******************************************************************************* - * For example, suppose that 2d point type data is indexed using RTree, there is no - * filter fields, and a primary key consists of a single field. - * ========== Without PointMBR optimization ========== - * If there is no point type optimization, the input operator of RTree's TreeIndexBulkLoadOperator - * delivers five variables to the TreeIndexBulkLoadOperator as follows: - * [$var1, $var2, $var3, $var4, $var5] - * where $var1 ~ $var4 together represent an MBR of a point object. - * Since it is a point object, $var1 and $var3 have always identical values. So do $var2 and $var3. - * $var5 represents a primary key value. - * fieldPermutation variable captures this order correctly by putting values in the array as follows: - * [0,1,2,3,4] - * =========== With PointMBR optimization =========== - * With PointMBR optimization, the input operator of RTree's TreeIndexBulkLoadOperator - * delivers 3 variables to the TreeIndexBulkLoadOperator as follows: - * [$var1, $var2, $var3] - * where $var1 and $var2 together represent an MBR of a point object. - * $var3 represents a primary key value. - * fieldPermutation variable captures this order correctly by putting values in the array as follows: - * [0,1,0,1,2] - * This means that bulkloadOp reads the pair of $var1 and $var2 twice in order to provide the same - * output just like when there were no PointMBR optimization available. - * This adjustment is done in this if clause code. - *********************************************************************************/ - int idx = numSecondaryKeyFieldsForPointMBR; - //add the rest of the sk fields for pointMBR - for (int i = 0; i < numSecondaryKeyFieldsForPointMBR; i++) { - fieldPermutation[idx++] = i; - } - //add the pk and filter fields - end = numSecondaryKeyFieldsForPointMBR + numPrimaryKeys + numFilterFields; - for (int i = numSecondaryKeyFieldsForPointMBR; i < end; i++) { - fieldPermutation[idx++] = i; - } - } - return fieldPermutation; - } - - @Override - public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - - StorageProperties storageProperties = propertiesProvider.getStorageProperties(); - boolean temp = dataset.getDatasetDetails().isTemp(); - LSMTreeIndexCompactOperatorDescriptor compactOp; - if (dataset.getDatasetType() == DatasetType.INTERNAL) { - IBinaryComparatorFactory[] btreeCompFactories = getComparatorFactoriesForDeletedKeyBTree();; - IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory( - valueProviderFactories, RTreePolicyType.RTREE, btreeCompFactories, - new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), mergePolicyFactory, - mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), rtreeFields, - filterTypeTraits, filterCmpFactories, secondaryFilterFields, !temp, isPointMBR); - compactOp = - new LSMTreeIndexCompactOperatorDescriptor(spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields, idff, - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - } else { - // External dataset - compactOp = - new LSMTreeIndexCompactOperatorDescriptor(spec, RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER, secondaryFileSplitProvider, - secondaryTypeTraits, secondaryComparatorFactories, secondaryBloomFilterKeyFields, - new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE, - primaryComparatorFactories, mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - storageProperties.getBloomFilterFalsePositiveRate(), - new int[] { numNestedSecondaryKeyFields }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true, isPointMBR), - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - } - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondaryPartitionConstraint); - spec.addRoot(compactOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java new file mode 100644 index 0000000..d0d14c8 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/StorageComponentProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.file; + +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.context.ITransactionSubsystemProvider; +import org.apache.asterix.common.context.TransactionSubsystemProvider; +import org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory; +import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; +import org.apache.asterix.formats.nontagged.TypeTraitProvider; +import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; +import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; +import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; +import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; +import org.apache.hyracks.storage.am.common.freepage.AppendOnlyLinkedMetadataPageManagerFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; +import org.apache.hyracks.storage.common.IStorageManager; + +public class StorageComponentProvider implements IStorageComponentProvider { + + @Override + public IPrimitiveValueProviderFactory getPrimitiveValueProviderFactory() { + return PrimitiveValueProviderFactory.INSTANCE; + } + + @Override + public ITransactionSubsystemProvider getTransactionSubsystemProvider() { + return TransactionSubsystemProvider.INSTANCE; + } + + @Override + public ILSMIOOperationSchedulerProvider getIoOperationSchedulerProvider() { + return RuntimeComponentsProvider.RUNTIME_PROVIDER; + } + + @Override + public IMetadataPageManagerFactory getMetadataPageManagerFactory() { + return AppendOnlyLinkedMetadataPageManagerFactory.INSTANCE; + } + + @Override + public IBinaryComparatorFactoryProvider getComparatorFactoryProvider() { + return BinaryComparatorFactoryProvider.INSTANCE; + } + + @Override + public TypeTraitProvider getTypeTraitProvider() { + return TypeTraitProvider.INSTANCE; + } + + @Override + public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() { + return RuntimeComponentsProvider.RUNTIME_PROVIDER; + } + + @Override + public IStorageManager getStorageManager() { + return RuntimeComponentsProvider.RUNTIME_PROVIDER; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index b17a722..7955ad6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -29,13 +29,13 @@ import javax.servlet.Servlet; import org.apache.asterix.active.ActiveLifecycleListener; import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.api.http.server.FullApiServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; import org.apache.asterix.api.http.server.ConnectorApiServlet; import org.apache.asterix.api.http.server.DdlApiServlet; import org.apache.asterix.api.http.server.DiagnosticsApiServlet; import org.apache.asterix.api.http.server.FeedServlet; +import org.apache.asterix.api.http.server.FullApiServlet; import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet; import org.apache.asterix.api.http.server.QueryApiServlet; import org.apache.asterix.api.http.server.QueryResultApiServlet; @@ -46,23 +46,25 @@ import org.apache.asterix.api.http.server.ShutdownApiServlet; import org.apache.asterix.api.http.server.UpdateApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.api.http.servlet.ServletConstants; -import org.apache.asterix.app.cc.CompilerExtensionManager; +import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.cc.ResourceIdManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.utils.LetUtil.Lets; import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.bootstrap.AsterixStateProxy; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.runtime.job.resource.JobCapacityController; -import org.apache.asterix.runtime.util.AppContextInfo; +import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.HyracksConnection; @@ -83,7 +85,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName()); private static IAsterixStateProxy proxy; protected ICCApplicationContext appCtx; - protected CompilerExtensionManager ccExtensionManager; + protected CCExtensionManager ccExtensionManager; + protected IStorageComponentProvider componentProvider; private IJobCapacityController jobCapacityController; protected WebManager webManager; @@ -98,17 +101,16 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { } appCtx.setThreadFactory(new AsterixThreadFactory(appCtx.getThreadFactory(), new LifeCycleComponentManager())); - GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection()); ILibraryManager libraryManager = new ExternalLibraryManager(); ResourceIdManager resourceIdManager = new ResourceIdManager(); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); - AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(), - libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE); - ccExtensionManager = new CompilerExtensionManager(getExtensions()); + componentProvider = new StorageComponentProvider(); + GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection(), componentProvider); + AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), libraryManager, resourceIdManager, + () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance()); + ccExtensionManager = new CCExtensionManager(getExtensions()); AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager); - final CCConfig ccConfig = controllerService.getCCConfig(); - if (System.getProperty("java.rmi.server.hostname") == null) { System.setProperty("java.rmi.server.hostname", ccConfig.clusterNetIpAddress); } @@ -119,8 +121,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { MetadataManager.initialize(proxy, metadataProperties); - AppContextInfo.INSTANCE.getCCApplicationContext() - .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE); + AppContextInfo.INSTANCE.getCCApplicationContext().addJobLifecycleListener(ActiveLifecycleListener.INSTANCE); // create event loop groups webManager = new WebManager(); @@ -161,19 +162,19 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { } protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception { - HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties - .getWebInterfacePort()); + HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + externalProperties.getWebInterfacePort()); IHyracksClientConnection hcc = getNewHyracksClientConnection(); webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); - webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, ccExtensionManager - .getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(), ccExtensionManager - .getQueryTranslatorFactory())); + webServer.addLet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, + ccExtensionManager.getAqlCompilationProvider(), ccExtensionManager.getSqlppCompilationProvider(), + ccExtensionManager.getQueryTranslatorFactory(), componentProvider)); return webServer; } protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception { - HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties - .getAPIServerPort()); + HttpServer jsonAPIServer = + new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort()); IHyracksClientConnection hcc = getNewHyracksClientConnection(); jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_BUILD_PROP_ATTR, AppContextInfo.INSTANCE); @@ -211,8 +212,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { } protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception { - HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties - .getQueryWebInterfacePort()); + HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + externalProperties.getQueryWebInterfacePort()); IHyracksClientConnection hcc = getNewHyracksClientConnection(); queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); queryWebServer.addLet(new QueryWebInterfaceServlet(queryWebServer.ctx(), new String[] { "/*" })); @@ -229,8 +230,8 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { } protected HttpServer setupFeedServer(ExternalProperties externalProperties) throws Exception { - HttpServer feedServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties - .getFeedServerPort()); + HttpServer feedServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + externalProperties.getFeedServerPort()); feedServer.setAttribute(HYRACKS_CONNECTION_ATTR, getNewHyracksClientConnection()); feedServer.addLet(new FeedServlet(feedServer.ctx(), new String[] { "/" })); return feedServer; @@ -240,35 +241,35 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint { switch (key) { case AQL: return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case AQL_QUERY: return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case AQL_UPDATE: return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case AQL_DDL: return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getAqlCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case SQLPP: return new FullApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case SQLPP_QUERY: return new QueryApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case SQLPP_UPDATE: return new UpdateApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case SQLPP_DDL: return new DdlApiServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(), - ccExtensionManager.getQueryTranslatorFactory()); + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case QUERY_STATUS: return new QueryStatusApiServlet(server.ctx(), paths); case QUERY_RESULT: return new QueryResultApiServlet(server.ctx(), paths); case QUERY_SERVICE: - return new QueryServiceServlet(server.ctx(), paths, ccExtensionManager - .getSqlppCompilationProvider(), ccExtensionManager.getQueryTranslatorFactory()); + return new QueryServiceServlet(server.ctx(), paths, ccExtensionManager.getSqlppCompilationProvider(), + ccExtensionManager.getQueryTranslatorFactory(), componentProvider); case CONNECTOR: return new ConnectorApiServlet(server.ctx(), paths); case SHUTDOWN: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 41d8b0d..53b577a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -41,7 +41,7 @@ import org.apache.asterix.metadata.cluster.AddNodeWorkResponse; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse; -import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.application.IClusterLifecycleListener; import org.apache.hyracks.api.exceptions.HyracksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java index b1d8dd3..71fff3f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java @@ -30,7 +30,7 @@ import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.metadata.cluster.AddNodeWork; import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; -import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.asterix.runtime.utils.ClusterStateManager; public class ClusterWorkExecutor implements Runnable { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index d437b5b..002e270 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -25,14 +25,14 @@ import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.app.external.ExternalIndexingOperations; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.IClusterManagementWorkResponse; -import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; +import org.apache.asterix.common.config.DatasetConfig.TransactionState; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; @@ -41,22 +41,25 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.ExternalDatasetDetails; import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.utils.ExternalIndexingOperations; import org.apache.asterix.metadata.utils.MetadataConstants; -import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; -public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { +public class GlobalRecoveryManager implements IGlobalRecoveryManager { private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName()); private static GlobalRecoveryManager instance; private static ClusterState state; + private final IStorageComponentProvider componentProvider; private HyracksConnection hcc; - private GlobalRecoveryManager(HyracksConnection hcc) { + private GlobalRecoveryManager(HyracksConnection hcc, IStorageComponentProvider componentProvider) { setState(ClusterState.UNUSABLE); this.hcc = hcc; + this.componentProvider = componentProvider; } @Override @@ -107,7 +110,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx); for (Dataverse dataverse : dataverses) { if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) { - MetadataProvider metadataProvider = new MetadataProvider(dataverse); + MetadataProvider metadataProvider = new MetadataProvider(dataverse, componentProvider); List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName()); for (Dataset dataset : datasets) { @@ -118,16 +121,16 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { dataset.getDataverseName(), dataset.getDatasetName()); if (!indexes.isEmpty()) { // Get the state of the dataset - ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset - .getDatasetDetails(); - ExternalDatasetTransactionState datasetState = dsd.getState(); - if (datasetState == ExternalDatasetTransactionState.BEGIN) { + ExternalDatasetDetails dsd = + (ExternalDatasetDetails) dataset.getDatasetDetails(); + TransactionState datasetState = dsd.getState(); + if (datasetState == TransactionState.BEGIN) { List<ExternalFile> files = MetadataManager.INSTANCE .getDatasetExternalFiles(mdTxnCtx, dataset); // if persumed abort, roll backward // 1. delete all pending files for (ExternalFile file : files) { - if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) { + if (file.getPendingOp() != ExternalFilePendingOp.NO_OP) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); } } @@ -138,11 +141,11 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { executeHyracksJob(jobSpec); // 3. correct the dataset state ((ExternalDatasetDetails) dataset.getDatasetDetails()) - .setState(ExternalDatasetTransactionState.COMMIT); + .setState(TransactionState.COMMIT); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) { + } else if (datasetState == TransactionState.READY_TO_COMMIT) { List<ExternalFile> files = MetadataManager.INSTANCE .getDatasetExternalFiles(mdTxnCtx, dataset); // if ready to commit, roll forward @@ -153,15 +156,15 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { executeHyracksJob(jobSpec); // 2. add pending files in metadata for (ExternalFile file : files) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) { + if (file.getPendingOp() == ExternalFilePendingOp.ADD_OP) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); - file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); + file.setPendingOp(ExternalFilePendingOp.NO_OP); MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); - } else if (file - .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { + } else if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { // find original file for (ExternalFile originalFile : files) { - if (originalFile.getFileName().equals(file.getFileName())) { + if (originalFile.getFileName() + .equals(file.getFileName())) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, @@ -170,10 +173,11 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { } } } else if (file - .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) { + .getPendingOp() == ExternalFilePendingOp.APPEND_OP) { // find original file for (ExternalFile originalFile : files) { - if (originalFile.getFileName().equals(file.getFileName())) { + if (originalFile.getFileName() + .equals(file.getFileName())) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, @@ -187,7 +191,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { } // 3. correct the dataset state ((ExternalDatasetDetails) dataset.getDatasetDetails()) - .setState(ExternalDatasetTransactionState.COMMIT); + .setState(TransactionState.COMMIT); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); @@ -225,8 +229,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryMaanger { return instance; } - public static synchronized void instantiate(HyracksConnection hcc) { - instance = new GlobalRecoveryManager(hcc); + public static synchronized void instantiate(HyracksConnection hcc, IStorageComponentProvider componentProvider) { + instance = new GlobalRecoveryManager(hcc, componentProvider); } public static synchronized void setState(ClusterState state) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java deleted file mode 100644 index 5a3419a..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.util; - -import org.apache.asterix.common.config.CompilerProperties; -import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; -import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; -import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; - -public class FlushDatasetUtils { - - public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider, - MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName) - throws Exception { - CompilerProperties compilerProperties = AppContextInfo.INSTANCE.getCompilerProperties(); - int frameSize = compilerProperties.getFrameSize(); - JobSpecification spec = new JobSpecification(frameSize); - - RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) }; - AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1, - new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs); - - org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId(); - Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); - FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId, - dataset.getDatasetId()); - - spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0); - - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, - dataset.getDatasetDetails().isTemp()); - AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource, - primaryPartitionConstraint); - - JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true); - spec.setJobletEventListenerFactory(jobEventListenerFactory); - JobUtils.runJob(hcc, spec, true); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java deleted file mode 100644 index fb50b0c..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.util; - -import org.apache.asterix.api.common.Job; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; - -public class JobUtils { - - public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) - throws Exception { - JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, waitForCompletion); - return jobIds[0]; - } - - public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, boolean waitForCompletion) - throws Exception { - JobId[] startedJobIds = new JobId[jobs.length]; - for (int i = 0; i < jobs.length; i++) { - JobSpecification spec = jobs[i].getJobSpec(); - spec.setMaxReattempts(0); - JobId jobId = hcc.startJob(spec); - startedJobIds[i] = jobId; - if (waitForCompletion) { - hcc.waitForCompletion(jobId); - } - } - return startedJobIds; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java deleted file mode 100644 index 50a21bc..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/ResourceUtils.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.util; - -import org.apache.asterix.app.resource.RequiredCapacityVisitor; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; -import org.apache.hyracks.api.job.resource.ClusterCapacity; -import org.apache.hyracks.api.job.resource.IClusterCapacity; - -public class ResourceUtils { - - private ResourceUtils() { - } - - /** - * Calculates the required cluster capacity from a given query plan, the computation locations, - * the operator memory budgets, and frame size. - * - * @param plan, - * a given query plan. - * @param computationLocations, - * the partitions for computation. - * @param sortFrameLimit, - * the frame limit for one sorter partition. - * @param groupFrameLimit, - * the frame limit for one group-by partition. - * @param joinFrameLimit - * the frame limit for one joiner partition. - * @param frameSize - * the frame size used in query execution. - * @return the required cluster capacity for executing the query. - * @throws AlgebricksException - * if the query plan is malformed. - */ - public static IClusterCapacity getRequiredCompacity(ILogicalPlan plan, - AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit, - int joinFrameLimit, int frameSize) - throws AlgebricksException { - // Creates a cluster capacity visitor. - IClusterCapacity clusterCapacity = new ClusterCapacity(); - RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length, - sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize, clusterCapacity); - - // There could be only one root operator for a top-level query plan. - ILogicalOperator rootOp = plan.getRoots().get(0).getValue(); - rootOp.accept(visitor, null); - return clusterCapacity; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java new file mode 100644 index 0000000..48fd782 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.utils; + +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataverse; +import org.apache.asterix.runtime.utils.RuntimeUtils; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class DataverseUtil { + + private DataverseUtil() { + } + + public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) { + JobSpecification jobSpec = RuntimeUtils.createJobSpecification(); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = + metadata.splitAndConstraints(dataverse.getDataverseName()); + FileRemoveOperatorDescriptor frod = + new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false); + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second); + jobSpec.addRoot(frod); + return jobSpec; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java new file mode 100644 index 0000000..07eed0d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ExtensionUtil.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.utils; + +import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.algebra.base.ILangExtension.Language; +import org.apache.asterix.app.cc.IStatementExecutorExtension; +import org.apache.asterix.common.api.ExtensionId; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.metadata.api.IMetadataExtension; +import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.hyracks.algebricks.common.utils.Pair; + +/** + * Provide util methods dealing with extensions + */ +public class ExtensionUtil { + + private ExtensionUtil() { + } + + /** + * Verifies no conflict and return the language compilation provider + * + * @param lang + * the language for the passed compilation provider + * @param cp + * placeholder for compilation provider + * @param le + * user defined extension for compilation provider + * @return a pair of extension id and extended compilation provider + * @throws RuntimeDataException + * if there was a conflict between two extensions + */ + public static Pair<ExtensionId, ILangCompilationProvider> extendLangCompilationProvider(Language lang, + Pair<ExtensionId, ILangCompilationProvider> cp, ILangExtension le) throws RuntimeDataException { + if (cp != null && le.getLangCompilationProvider(lang) != null) { + throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, le.getId(), cp.first, + lang.toString()); + } + return (le.getLangCompilationProvider(lang) != null) + ? new Pair<>(le.getId(), le.getLangCompilationProvider(lang)) : cp; + } + + /** + * Validate no extension conflict and return statement executor extension + * + * @param qte + * place holder for statement executor extension + * @param extension + * user defined extension + * @return the user defined extension + * @throws RuntimeDataException + * if extension conflict was detected + */ + public static IStatementExecutorExtension extendStatementExecutor(IStatementExecutorExtension qte, + IStatementExecutorExtension extension) throws RuntimeDataException { + if (qte != null) { + throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, qte.getId(), extension.getId(), + IStatementExecutorFactory.class.getSimpleName()); + } + return extension; + } + + /** + * Validate no extension conflict and extends tuple translator provider + * + * @param metadataExtension + * place holder for tuple translator provider extension + * @param mde + * user defined metadata extension + * @return the metadata extension if the extension defines a metadata tuple translator, null otherwise + * @throws RuntimeDataException + * if an extension conflict was detected + */ + public static IMetadataExtension extendTupleTranslatorProvider(IMetadataExtension metadataExtension, + IMetadataExtension mde) throws RuntimeDataException { + if (metadataExtension != null) { + throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, metadataExtension.getId(), + mde.getId(), IMetadataExtension.class.getSimpleName()); + } + return mde.getMetadataTupleTranslatorProvider() == null ? null : mde; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java new file mode 100644 index 0000000..bc8a79e --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.utils; + +import org.apache.asterix.common.config.CompilerProperties; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor; +import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; +import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; + +public class FlushDatasetUtil { + private FlushDatasetUtil() { + } + + public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider metadataProvider, + String dataverseName, String datasetName, String indexName) throws Exception { + CompilerProperties compilerProperties = AppContextInfo.INSTANCE.getCompilerProperties(); + int frameSize = compilerProperties.getFrameSize(); + JobSpecification spec = new JobSpecification(frameSize); + + RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) }; + AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1, + new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs); + + org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId(); + Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName); + FlushDatasetOperatorDescriptor flushOperator = + new FlushDatasetOperatorDescriptor(spec, jobId, dataset.getDatasetId()); + + spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0); + + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = + metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, indexName, + dataset.getDatasetDetails().isTemp()); + AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second; + + AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource, + primaryPartitionConstraint); + + JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true); + spec.setJobletEventListenerFactory(jobEventListenerFactory); + JobUtils.runJob(hcc, spec, true); + } + +}
