http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java deleted file mode 100644 index 95ce100..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.cc; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.algebra.base.ILangExtension; -import org.apache.asterix.algebra.base.ILangExtension.Language; -import org.apache.asterix.algebra.extension.IAlgebraExtensionManager; -import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.common.api.ExtensionId; -import org.apache.asterix.common.api.IExtension; -import org.apache.asterix.common.config.AsterixExtension; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.compiler.provider.SqlppCompilationProvider; -import org.apache.asterix.translator.IStatementExecutorFactory; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * AsterixDB's implementation of {@code IAlgebraExtensionManager} which takes care of - * initializing extensions for App and Compilation purposes - */ -public class CompilerExtensionManager implements IAlgebraExtensionManager { - - private final Map<ExtensionId, IExtension> extensions = new HashMap<>(); - - private final IStatementExecutorExtension statementExecutorExtension; - private final ILangCompilationProvider aqlCompilationProvider; - private final ILangCompilationProvider sqlppCompilationProvider; - private final DefaultStatementExecutorFactory defaultQueryTranslatorFactory; - - /** - * Initialize {@code CompilerExtensionManager} from configuration - * - * @param list - * @throws InstantiationException - * @throws IllegalAccessException - * @throws ClassNotFoundException - * @throws HyracksDataException - */ - public CompilerExtensionManager(List<AsterixExtension> list) - throws InstantiationException, IllegalAccessException, ClassNotFoundException, HyracksDataException { - Pair<ExtensionId, ILangCompilationProvider> aqlcp = null; - Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null; - IStatementExecutorExtension see = null; - defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory(); - - if (list != null) { - for (AsterixExtension extensionConf : list) { - IExtension extension = (IExtension) Class.forName(extensionConf.getClassName()).newInstance(); - extension.configure(extensionConf.getArgs()); - if (extensions.containsKey(extension.getId())) { - throw new RuntimeDataException(ErrorCode.EXTENSION_ID_CONFLICT, extension.getId()); - } - extensions.put(extension.getId(), extension); - switch (extension.getExtensionKind()) { - case STATEMENT_EXECUTOR: - see = extendStatementExecutor(see, (IStatementExecutorExtension) extension); - break; - case LANG: - ILangExtension le = (ILangExtension) extension; - aqlcp = extendLangCompilationProvider(Language.AQL, aqlcp, le); - sqlppcp = extendLangCompilationProvider(Language.SQLPP, sqlppcp, le); - break; - default: - break; - } - } - } - this.statementExecutorExtension = see; - this.aqlCompilationProvider = aqlcp == null ? new AqlCompilationProvider() : aqlcp.second; - this.sqlppCompilationProvider = sqlppcp == null ? new SqlppCompilationProvider() : sqlppcp.second; - } - - private Pair<ExtensionId, ILangCompilationProvider> extendLangCompilationProvider(Language lang, - Pair<ExtensionId, ILangCompilationProvider> cp, ILangExtension le) throws HyracksDataException { - if (cp != null && le.getLangCompilationProvider(lang) != null) { - throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, le.getId(), cp.first, - lang.toString()); - } - return (le.getLangCompilationProvider(lang) != null) - ? new Pair<>(le.getId(), le.getLangCompilationProvider(lang)) : cp; - } - - private IStatementExecutorExtension extendStatementExecutor(IStatementExecutorExtension qte, - IStatementExecutorExtension extension) throws HyracksDataException { - if (qte != null) { - throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, qte.getId(), extension.getId(), - IStatementExecutorFactory.class.getSimpleName()); - } - return extension; - } - - public IStatementExecutorFactory getQueryTranslatorFactory() { - return statementExecutorExtension == null ? defaultQueryTranslatorFactory - : statementExecutorExtension.getQueryTranslatorFactory(); - } - - public ILangCompilationProvider getAqlCompilationProvider() { - return aqlCompilationProvider; - } - - public ILangCompilationProvider getSqlppCompilationProvider() { - return sqlppCompilationProvider; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java index a4b2345..f7b6842 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/IStatementExecutorExtension.java @@ -25,6 +25,7 @@ import org.apache.asterix.translator.IStatementExecutorFactory; * An interface for extensions of {@code IStatementExecutor} */ public interface IStatementExecutorExtension extends IExtension { + @Override default ExtensionKind getExtensionKind() { return ExtensionKind.STATEMENT_EXECUTOR; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java index 7b01169..f43092b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java @@ -23,7 +23,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.asterix.runtime.util.ClusterStateManager; +import org.apache.asterix.runtime.utils.ClusterStateManager; public class ResourceIdManager implements IResourceIdManager { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java deleted file mode 100644 index a2518ec..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java +++ /dev/null @@ -1,771 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.external; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; -import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; -import org.apache.asterix.common.config.DatasetConfig.IndexType; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.config.StorageProperties; -import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider; -import org.apache.asterix.common.dataflow.LSMIndexUtil; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; -import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; -import org.apache.asterix.common.transactions.IResourceFactory; -import org.apache.asterix.dataflow.data.nontagged.valueproviders.PrimitiveValueProviderFactory; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.FilesIndexDescription; -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesCommitOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalDatasetIndexesRecoverOperatorDescriptor; -import org.apache.asterix.external.operators.ExternalFilesIndexOperatorDescriptor; -import org.apache.asterix.external.operators.IndexInfoOperatorDescriptor; -import org.apache.asterix.external.provider.AdapterFactoryProvider; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.file.IndexOperations; -import org.apache.asterix.file.JobSpecificationUtils; -import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; -import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; -import org.apache.asterix.formats.nontagged.TypeTraitProvider; -import org.apache.asterix.metadata.MetadataException; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.entities.ExternalDatasetDetails; -import org.apache.asterix.metadata.entities.Index; -import org.apache.asterix.metadata.utils.DatasetUtils; -import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.util.NonTaggedFormatUtil; -import org.apache.asterix.runtime.util.AppContextInfo; -import org.apache.asterix.runtime.util.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider; -import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadataFactory; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; -import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; -import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; -import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor; -import org.apache.hyracks.storage.am.lsm.rtree.dataflow.ExternalRTreeDataflowHelperFactory; -import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType; -import org.apache.hyracks.storage.common.file.LocalResource; - -public class ExternalIndexingOperations { - - public static final List<List<String>> FILE_INDEX_FIELD_NAMES = Collections - .singletonList(Collections.singletonList("")); - public static final List<IAType> FILE_INDEX_FIELD_TYPES = Collections.singletonList(BuiltinType.ASTRING); - - private ExternalIndexingOperations() { - } - - public static boolean isIndexible(ExternalDatasetDetails ds) { - String adapter = ds.getAdapter(); - if (adapter.equalsIgnoreCase(ExternalDataConstants.ALIAS_HDFS_ADAPTER)) { - return true; - } - return false; - } - - public static boolean isRefereshActive(ExternalDatasetDetails ds) { - return ds.getState() != ExternalDatasetTransactionState.COMMIT; - } - - public static boolean isValidIndexName(String datasetName, String indexName) { - return (!datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX).equals(indexName)); - } - - public static String getFilesIndexName(String datasetName) { - return datasetName.concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX); - } - - public static int getRIDSize(Dataset dataset) { - ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails()); - return IndexingConstants.getRIDSize(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)); - } - - public static IBinaryComparatorFactory[] getComparatorFactories(Dataset dataset) { - ExternalDatasetDetails dsd = ((ExternalDatasetDetails) dataset.getDatasetDetails()); - return IndexingConstants.getComparatorFactories(dsd.getProperties().get(IndexingConstants.KEY_INPUT_FORMAT)); - } - - public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() { - return IndexingConstants.getBuddyBtreeComparatorFactories(); - } - - public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset) - throws AlgebricksException { - ArrayList<ExternalFile> files = new ArrayList<>(); - ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - try { - // Create the file system object - FileSystem fs = getFileSystemObject(datasetDetails.getProperties()); - // Get paths of dataset - String path = datasetDetails.getProperties().get(ExternalDataConstants.KEY_PATH); - String[] paths = path.split(","); - - // Add fileStatuses to files - for (String aPath : paths) { - FileStatus[] fileStatuses = fs.listStatus(new Path(aPath)); - for (int i = 0; i < fileStatuses.length; i++) { - int nextFileNumber = files.size(); - if (fileStatuses[i].isDirectory()) { - listSubFiles(dataset, fs, fileStatuses[i], files); - } else { - files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, - fileStatuses[i].getPath().toUri().getPath(), - new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(), - ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - // Close file system - fs.close(); - if (files.size() == 0) { - throw new AlgebricksException("File Snapshot retrieved from external file system is empty"); - } - return files; - } catch (Exception e) { - e.printStackTrace(); - throw new AlgebricksException("Unable to get list of HDFS files " + e); - } - } - - /* list all files under the directory - * src is expected to be a folder - */ - private static void listSubFiles(Dataset dataset, FileSystem srcFs, FileStatus src, ArrayList<ExternalFile> files) - throws IOException { - Path path = src.getPath(); - FileStatus[] fileStatuses = srcFs.listStatus(path); - for (int i = 0; i < fileStatuses.length; i++) { - int nextFileNumber = files.size(); - if (fileStatuses[i].isDirectory()) { - listSubFiles(dataset, srcFs, fileStatuses[i], files); - } else { - files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber, - fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i].getModificationTime()), - fileStatuses[i].getLen(), ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - - public static FileSystem getFileSystemObject(Map<String, String> map) throws IOException { - Configuration conf = new Configuration(); - conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, map.get(ExternalDataConstants.KEY_HDFS_URL).trim()); - conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_CLASS, DistributedFileSystem.class.getName()); - return FileSystem.get(conf); - } - - public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset, - ArrayList<ExternalFile> externalFilesSnapshot, MetadataProvider metadataProvider, boolean createIndex) - throws MetadataException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(), - getFilesIndexName(dataset.getDatasetName()), true); - IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - IResourceFactory localResourceMetadata = new ExternalBTreeLocalResourceMetadataFactory( - filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES, - new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties); - PersistentLocalResourceFactoryProvider localResourceFactoryProvider = - new PersistentLocalResourceFactoryProvider( - localResourceMetadata, LocalResource.ExternalBTreeResource); - ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, indexDataflowHelperFactory, localResourceFactoryProvider, - externalFilesSnapshot, createIndex, LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, externalFilesOp, - secondarySplitsAndConstraint.second); - spec.addRoot(externalFilesOp); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - /** - * This method create an indexing operator that index records in HDFS - * - * @param jobSpec - * @param itemType - * @param dataset - * @param files - * @param indexerDesc - * @return - * @throws AlgebricksException - * @throws HyracksDataException - * @throws Exception - */ - private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> - getExternalDataIndexingOperator( - MetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset, - List<ExternalFile> files, RecordDescriptor indexerDesc) - throws HyracksDataException, AlgebricksException { - ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); - Map<String, String> configuration = externalDatasetDetails.getProperties(); - IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory( - metadataProvider.getLibraryManager(), externalDatasetDetails.getAdapter(), configuration, - (ARecordType) itemType, files, true, null); - return new Pair<>(new ExternalDataScanOperatorDescriptor(jobSpec, indexerDesc, adapterFactory), - adapterFactory.getPartitionConstraint()); - } - - public static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp( - JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, ARecordType itemType, - RecordDescriptor indexerDesc, List<ExternalFile> files) throws HyracksDataException, AlgebricksException { - if (files == null) { - files = MetadataManager.INSTANCE.getDatasetExternalFiles(metadataProvider.getMetadataTxnContext(), dataset); - } - return getExternalDataIndexingOperator(metadataProvider, spec, itemType, dataset, files, indexerDesc); - } - - /** - * At the end of this method, we expect to have 4 sets as follows: - * metadataFiles should contain only the files that are appended in their original state - * addedFiles should contain new files that has number assigned starting after the max original file number - * deletedFiles should contain files that are no longer there in the file system - * appendedFiles should have the new file information of existing files - * The method should return false in case of zero delta - * - * @param dataset - * @param metadataFiles - * @param addedFiles - * @param deletedFiles - * @param appendedFiles - * @return - * @throws MetadataException - * @throws AlgebricksException - */ - public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles, - List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles) - throws MetadataException, AlgebricksException { - boolean uptodate = true; - int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1; - - ArrayList<ExternalFile> fileSystemFiles = getSnapshotFromExternalFileSystem(dataset); - - // Loop over file system files < taking care of added files > - for (ExternalFile fileSystemFile : fileSystemFiles) { - boolean fileFound = false; - Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator(); - while (mdFilesIterator.hasNext()) { - ExternalFile metadataFile = mdFilesIterator.next(); - if (fileSystemFile.getFileName().equals(metadataFile.getFileName())) { - // Same file name - if (fileSystemFile.getLastModefiedTime().equals(metadataFile.getLastModefiedTime())) { - // Same timestamp - if (fileSystemFile.getSize() == metadataFile.getSize()) { - // Same size -> no op - mdFilesIterator.remove(); - fileFound = true; - } else { - // Different size -> append op - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP); - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_APPEND_OP); - appendedFiles.add(fileSystemFile); - fileFound = true; - uptodate = false; - } - } else { - // Same file name, Different file mod date -> delete and add - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP); - deletedFiles - .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0, - metadataFile.getFileName(), metadataFile.getLastModefiedTime(), - metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP)); - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP); - fileSystemFile.setFileNumber(newFileNumber); - addedFiles.add(fileSystemFile); - newFileNumber++; - fileFound = true; - uptodate = false; - } - } - if (fileFound) { - break; - } - } - if (!fileFound) { - // File not stored previously in metadata -> pending add op - fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP); - fileSystemFile.setFileNumber(newFileNumber); - addedFiles.add(fileSystemFile); - newFileNumber++; - uptodate = false; - } - } - - // Done with files from external file system -> metadata files now contain both deleted files and appended ones - // first, correct number assignment to deleted and updated files - for (ExternalFile deletedFile : deletedFiles) { - deletedFile.setFileNumber(newFileNumber); - newFileNumber++; - } - for (ExternalFile appendedFile : appendedFiles) { - appendedFile.setFileNumber(newFileNumber); - newFileNumber++; - } - - // include the remaining deleted files - Iterator<ExternalFile> mdFilesIterator = metadataFiles.iterator(); - while (mdFilesIterator.hasNext()) { - ExternalFile metadataFile = mdFilesIterator.next(); - if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) { - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP); - deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), - newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), - metadataFile.getSize(), metadataFile.getPendingOp())); - newFileNumber++; - uptodate = false; - } - } - return uptodate; - } - - public static Dataset createTransactionDataset(Dataset dataset) { - ExternalDatasetDetails originalDsd = (ExternalDatasetDetails) dataset.getDatasetDetails(); - ExternalDatasetDetails dsd = new ExternalDatasetDetails(originalDsd.getAdapter(), originalDsd.getProperties(), - originalDsd.getTimestamp(), ExternalDatasetTransactionState.BEGIN); - Dataset transactionDatset = new Dataset(dataset.getDataverseName(), dataset.getDatasetName(), - dataset.getItemTypeDataverseName(), dataset.getItemTypeName(), dataset.getNodeGroupName(), - dataset.getCompactionPolicy(), dataset.getCompactionPolicyProperties(), dsd, dataset.getHints(), - DatasetType.EXTERNAL, dataset.getDatasetId(), dataset.getPendingOp()); - return transactionDatset; - } - - public static boolean isFileIndex(Index index) { - return (index.getIndexName().equals(getFilesIndexName(index.getDatasetName()))); - } - - public static JobSpecification buildDropFilesIndexJobSpec(CompiledIndexDropStatement indexDropStmt, - MetadataProvider metadataProvider, Dataset dataset) throws AlgebricksException, MetadataException { - String dataverseName = indexDropStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName() - : indexDropStmt.getDataverseName(); - String datasetName = indexDropStmt.getDatasetName(); - String indexName = indexDropStmt.getIndexName(); - boolean temp = dataset.getDatasetDetails().isTemp(); - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true); - StorageProperties storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - splitsAndConstraint.first, - new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp), - LSMIndexUtil.getMetadataPageManagerFactory()); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, - splitsAndConstraint.second); - spec.addRoot(btreeDrop); - - return spec; - } - - public static JobSpecification buildFilesIndexUpdateOp(Dataset ds, List<ExternalFile> metadataFiles, - List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, - MetadataProvider metadataProvider) throws MetadataException, AlgebricksException { - ArrayList<ExternalFile> files = new ArrayList<>(); - for (ExternalFile file : metadataFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { - files.add(file); - } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) { - for (ExternalFile appendedFile : appendedFiles) { - if (appendedFile.getFileName().equals(file.getFileName())) { - files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(), - file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(), - ExternalFilePendingOp.PENDING_NO_OP)); - } - } - } - } - for (ExternalFile file : addedFiles) { - files.add(file); - } - Collections.sort(files); - return buildFilesIndexReplicationJobSpec(ds, files, metadataProvider, false); - } - - public static JobSpecification buildIndexUpdateOp(Dataset ds, Index index, List<ExternalFile> metadataFiles, - List<ExternalFile> deletedFiles, List<ExternalFile> addedFiles, List<ExternalFile> appendedFiles, - MetadataProvider metadataProvider) throws AsterixException, AlgebricksException { - // Create files list - ArrayList<ExternalFile> files = new ArrayList<>(); - - for (ExternalFile metadataFile : metadataFiles) { - if (metadataFile.getPendingOp() != ExternalFilePendingOp.PENDING_APPEND_OP) { - files.add(metadataFile); - } else { - metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); - files.add(metadataFile); - } - } - // add new files - for (ExternalFile file : addedFiles) { - files.add(file); - } - // add appended files - for (ExternalFile file : appendedFiles) { - files.add(file); - } - - CompiledCreateIndexStatement ccis = new CompiledCreateIndexStatement(index.getIndexName(), - index.getDataverseName(), index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType()); - return IndexOperations.buildSecondaryIndexLoadingJobSpec(ccis, null, null, null, null, metadataProvider, files); - } - - public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - boolean temp = ds.getDatasetDetails().isTemp(); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>(); - ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesCommitOperatorDescriptor op = new ExternalDatasetIndexesCommitOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - private static ExternalBTreeDataflowHelperFactory getFilesIndexDataflowHelperFactory(Dataset ds, - ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties, - StorageProperties storageProperties, JobSpecification spec) { - return new ExternalBTreeDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true); - } - - private static ExternalBTreeWithBuddyDataflowHelperFactory getBTreeDataflowHelperFactory(Dataset ds, Index index, - ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties, - StorageProperties storageProperties, JobSpecification spec) { - return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true); - } - - @SuppressWarnings("rawtypes") - private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index, - ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties, - StorageProperties storageProperties, MetadataProvider metadataProvider, JobSpecification spec) - throws AlgebricksException, AsterixException { - int numPrimaryKeys = getRIDSize(ds); - List<List<String>> secondaryKeyFields = index.getKeyFieldNames(); - secondaryKeyFields.size(); - ARecordType itemType = (ARecordType) metadataProvider.findType(ds.getItemTypeDataverseName(), - ds.getItemTypeName()); - Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType); - IAType spatialType = spatialTypePair.first; - if (spatialType == null) { - throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema."); - } - boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D; - int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); - int numNestedSecondaryKeyFields = numDimensions * 2; - IPrimitiveValueProviderFactory[] valueProviderFactories = - new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields]; - IBinaryComparatorFactory[] secondaryComparatorFactories = - new IBinaryComparatorFactory[numNestedSecondaryKeyFields]; - - ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys - + numNestedSecondaryKeyFields]; - ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys]; - IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag()); - ATypeTag keyType = nestedKeyType.getTypeTag(); - - keyType = nestedKeyType.getTypeTag(); - for (int i = 0; i < numNestedSecondaryKeyFields; i++) { - ISerializerDeserializer keySerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(nestedKeyType); - secondaryRecFields[i] = keySerde; - - secondaryComparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE - .getBinaryComparatorFactory(nestedKeyType, true); - secondaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType); - valueProviderFactories[i] = PrimitiveValueProviderFactory.INSTANCE; - } - // Add serializers and comparators for primary index fields. - for (int i = 0; i < numPrimaryKeys; i++) { - secondaryRecFields[numNestedSecondaryKeyFields + i] = IndexingConstants.getSerializerDeserializer(i); - secondaryTypeTraits[numNestedSecondaryKeyFields + i] = IndexingConstants.getTypeTraits(i); - } - int[] primaryKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < primaryKeyFields.length; i++) { - primaryKeyFields[i] = i + numNestedSecondaryKeyFields; - } - - return new ExternalRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE, - getBuddyBtreeComparatorFactories(), mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE, - MetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length), - storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() }, - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true, isPointMBR); - } - - public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; - - boolean temp = ds.getDatasetDetails().isTemp(); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>(); - ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesAbortOperatorDescriptor op = new ExternalDatasetIndexesAbortOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - - } - - public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, MetadataProvider metadataProvider) - throws AlgebricksException, AsterixException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; - boolean temp = ds.getDatasetDetails().isTemp(); - - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - getFilesIndexName(ds.getDatasetName()), temp); - IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds, - mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec); - IndexInfoOperatorDescriptor filesIndexInfo = new IndexInfoOperatorDescriptor(filesIndexSplitProvider, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER); - - ArrayList<ExternalBTreeWithBuddyDataflowHelperFactory> btreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> btreeInfos = new ArrayList<>(); - ArrayList<ExternalRTreeDataflowHelperFactory> rtreeDataflowHelperFactories = new ArrayList<>(); - ArrayList<IndexInfoOperatorDescriptor> rtreeInfos = new ArrayList<>(); - - for (Index index : indexes) { - if (isValidIndexName(index.getDatasetName(), index.getIndexName())) { - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(), - index.getIndexName(), temp); - if (index.getIndexType() == IndexType.BTREE) { - btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, spec)); - btreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } else if (index.getIndexType() == IndexType.RTREE) { - rtreeDataflowHelperFactories.add(getRTreeDataflowHelperFactory(ds, index, mergePolicyFactory, - mergePolicyFactoryProperties, storageProperties, metadataProvider, spec)); - rtreeInfos.add(new IndexInfoOperatorDescriptor(indexSplitsAndConstraint.first, - RuntimeComponentsProvider.RUNTIME_PROVIDER, - RuntimeComponentsProvider.RUNTIME_PROVIDER)); - } - } - } - - ExternalDatasetIndexesRecoverOperatorDescriptor op = new ExternalDatasetIndexesRecoverOperatorDescriptor(spec, - filesIndexDataflowHelperFactory, filesIndexInfo, btreeDataflowHelperFactories, btreeInfos, - rtreeDataflowHelperFactories, rtreeInfos); - - spec.addRoot(op); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, op, - filesIndexSplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } - - public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, MetadataProvider metadataProvider) - throws MetadataException, AlgebricksException { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IPropertiesProvider asterixPropertiesProvider = AppContextInfo.INSTANCE; - StorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties(); - Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, - metadataProvider.getMetadataTxnContext()); - ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first; - Map<String, String> mergePolicyFactoryProperties = compactionInfo.second; - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider - .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(), - getFilesIndexName(dataset.getDatasetName()), true); - IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first; - ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory( - mergePolicyFactory, mergePolicyFactoryProperties, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - RuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE, - storageProperties.getBloomFilterFalsePositiveRate(), - ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true); - FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec, - RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER, - secondaryFileSplitProvider, filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, - filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, indexDataflowHelperFactory, - NoOpOperationCallbackFactory.INSTANCE, LSMIndexUtil.getMetadataPageManagerFactory()); - spec.addRoot(compactOp); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp, - secondarySplitsAndConstraint.second); - spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); - return spec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java index 95fe68c..4680465 100755 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java @@ -43,11 +43,11 @@ import org.apache.asterix.external.library.LibraryAdapter; import org.apache.asterix.external.library.LibraryFunction; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.api.IMetadataEntity; import org.apache.asterix.metadata.entities.DatasourceAdapter; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.entities.Function; import org.apache.asterix.metadata.entities.Library; +import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; public class ExternalLibraryUtils { @@ -81,7 +81,7 @@ public class ExternalLibraryUtils { // get library file File libraryDir = new File(installLibDir.getAbsolutePath() + File.separator + dataverse + File.separator + library); - // install if needed (i,e, add the functions, adapters, datasources, parsers to the metadata) <Not required for use> + // install if needed (i,e, add the functions, adapters, datasources, parsers to the metadata) installLibraryIfNeeded(dataverse, libraryDir, uninstalledLibs); } } @@ -96,7 +96,7 @@ public class ExternalLibraryUtils { * @throws Exception */ private static Map<String, List<String>> uninstallLibraries() throws Exception { - Map<String, List<String>> uninstalledLibs = new HashMap<String, List<String>>(); + Map<String, List<String>> uninstalledLibs = new HashMap<>(); // get the directory of the un-install libraries File uninstallLibDir = getLibraryUninstallDir(); String[] uninstallLibNames; @@ -116,7 +116,7 @@ public class ExternalLibraryUtils { // add the library to the list of uninstalled libraries List<String> uinstalledLibsInDv = uninstalledLibs.get(dataverse); if (uinstalledLibsInDv == null) { - uinstalledLibsInDv = new ArrayList<String>(); + uinstalledLibsInDv = new ArrayList<>(); uninstalledLibs.put(dataverse, uinstalledLibsInDv); } uinstalledLibsInDv.add(libName); @@ -172,7 +172,8 @@ public class ExternalLibraryUtils { // belong to the library? if (adapter.getAdapterIdentifier().getName().startsWith(libraryName + "#")) { // remove adapter <! we didn't check if there are feeds which use this adapter> - MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, adapter.getAdapterIdentifier().getName()); + MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverse, + adapter.getAdapterIdentifier().getName()); } } // drop the library itself @@ -203,7 +204,8 @@ public class ExternalLibraryUtils { Library libraryInMetadata = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverse, libraryName); if (libraryInMetadata != null && !wasUninstalled) { // exists in metadata and was not un-installed, we return. - // Another place which shows that our metadata transactions are broken (we didn't call commit before!!!) + // Another place which shows that our metadata transactions are broken + // (we didn't call commit before!!!) MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); return; } @@ -235,13 +237,13 @@ public class ExternalLibraryUtils { Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); if (dv == null) { MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverse, - NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, IMetadataEntity.PENDING_NO_OP)); + NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, MetadataUtil.PENDING_NO_OP)); } // Add functions if (library.getLibraryFunctions() != null) { for (LibraryFunction function : library.getLibraryFunctions().getLibraryFunction()) { String[] fargs = function.getArguments().trim().split(","); - List<String> args = new ArrayList<String>(); + List<String> args = new ArrayList<>(); for (String arg : fargs) { args.add(arg); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java deleted file mode 100644 index 34746d3..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.external; - -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.asterix.active.ActiveJobNotificationHandler; -import org.apache.asterix.active.EntityId; -import org.apache.asterix.active.IActiveMessage; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.external.api.IAdapterFactory; -import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.feed.management.FeedEventsListener; -import org.apache.asterix.external.feed.message.EndFeedMessage; -import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; -import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; -import org.apache.asterix.external.operators.FeedMessageOperatorDescriptor; -import org.apache.asterix.external.util.FeedConstants; -import org.apache.asterix.external.util.FeedUtils; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.asterix.file.JobSpecificationUtils; -import org.apache.asterix.metadata.declared.MetadataProvider; -import org.apache.asterix.metadata.entities.Feed; -import org.apache.asterix.runtime.util.ClusterStateManager; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.Pair; -import org.apache.hyracks.algebricks.common.utils.Triple; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; -import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; - -/** - * Provides helper method(s) for creating JobSpec for operations on a feed. - */ -public class FeedOperations { - - /** - * Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor. - * - * @param primaryFeed - * @param metadataProvider - * @return JobSpecification the Hyracks job specification for receiving data from external source - * @throws Exception - */ - public static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed primaryFeed, - MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - spec.setFrameSize(FeedConstants.JobConstants.DEFAULT_FRAME_SIZE); - IAdapterFactory adapterFactory = null; - IOperatorDescriptor feedIngestor; - AlgebricksPartitionConstraint ingesterPc; - Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t = - metadataProvider.buildFeedIntakeRuntime(spec, primaryFeed, policyAccessor); - feedIngestor = t.first; - ingesterPc = t.second; - adapterFactory = t.third; - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedIngestor, ingesterPc); - NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, ingesterPc); - spec.connect(new OneToOneConnectorDescriptor(spec), feedIngestor, 0, nullSink, 0); - spec.addRoot(nullSink); - return new Pair<>(spec, adapterFactory); - } - - /** - * Builds the job spec for sending message to an active feed to disconnect it from the - * its source. - */ - public static Pair<JobSpecification, Boolean> buildDisconnectFeedJobSpec(MetadataProvider metadataProvider, - FeedConnectionId connectionId) throws AsterixException, AlgebricksException { - - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - IOperatorDescriptor feedMessenger; - AlgebricksPartitionConstraint messengerPc; - List<String> locations = null; - FeedRuntimeType sourceRuntimeType; - try { - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(connectionId.getFeedId()); - FeedConnectJobInfo cInfo = listener.getFeedConnectJobInfo(connectionId); - IFeedJoint sourceFeedJoint = cInfo.getSourceFeedJoint(); - IFeedJoint computeFeedJoint = cInfo.getComputeFeedJoint(); - - boolean terminateIntakeJob = false; - boolean completeDisconnect = computeFeedJoint == null || computeFeedJoint.getReceivers().isEmpty(); - if (completeDisconnect) { - sourceRuntimeType = FeedRuntimeType.INTAKE; - locations = cInfo.getCollectLocations(); - terminateIntakeJob = sourceFeedJoint.getReceivers().size() == 1; - } else { - locations = cInfo.getComputeLocations(); - sourceRuntimeType = FeedRuntimeType.COMPUTE; - } - - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = buildDisconnectFeedMessengerRuntime(spec, - connectionId, locations, sourceRuntimeType, completeDisconnect, sourceFeedJoint.getOwnerFeedId()); - - feedMessenger = p.first; - messengerPc = p.second; - - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc); - NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc); - spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0); - spec.addRoot(nullSink); - return new Pair<>(spec, terminateIntakeJob); - - } catch (AlgebricksException e) { - throw new AsterixException(e); - } - - } - - private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime( - JobSpecification jobSpec, FeedConnectionId feedConenctionId, IActiveMessage feedMessage, - Collection<String> locations) throws AlgebricksException { - AlgebricksPartitionConstraint partitionConstraint = - new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {})); - FeedMessageOperatorDescriptor feedMessenger = - new FeedMessageOperatorDescriptor(jobSpec, feedConenctionId, feedMessage); - return new Pair<>(feedMessenger, partitionConstraint); - } - - private static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime( - JobSpecification jobSpec, FeedConnectionId feedConenctionId, List<String> locations, - FeedRuntimeType sourceFeedRuntimeType, boolean completeDisconnection, EntityId sourceFeedId) - throws AlgebricksException { - IActiveMessage feedMessage = new EndFeedMessage(feedConenctionId, sourceFeedRuntimeType, sourceFeedId, - completeDisconnection, EndFeedMessage.EndMessageType.DISCONNECT_FEED); - return buildSendFeedMessageRuntime(jobSpec, feedConenctionId, feedMessage, locations); - } - - public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception { - JobSpecification spec = JobSpecificationUtils.createJobSpecification(); - AlgebricksAbsolutePartitionConstraint allCluster = ClusterStateManager.INSTANCE.getClusterLocations(); - Set<String> nodes = new TreeSet<>(); - for (String node : allCluster.getLocations()) { - nodes.add(node); - } - AlgebricksAbsolutePartitionConstraint locations = - new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[nodes.size()])); - FileSplit[] feedLogFileSplits = - FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits); - FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true); - AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second); - spec.addRoot(frod); - return spec; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java index 6be7af9..ec7c239 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java @@ -24,17 +24,19 @@ import java.util.List; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.app.translator.QueryTranslator; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.compiler.provider.AqlCompilationProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.feed.api.IFeedWork; import org.apache.asterix.external.feed.api.IFeedWorkEventListener; import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.management.FeedConnectionRequest.ConnectionStatus; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.statement.DataverseDecl; import org.apache.asterix.lang.common.struct.Identifier; -import org.apache.asterix.runtime.util.AppContextInfo; +import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; @@ -48,6 +50,7 @@ public class FeedWorkCollection { private static Logger LOGGER = Logger.getLogger(FeedWorkCollection.class.getName()); private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); + private static final IStorageComponentProvider storageComponentProvider = new StorageComponentProvider(); /** * The task of subscribing to a feed to obtain data. @@ -91,7 +94,8 @@ public class FeedWorkCollection { List<Statement> statements = new ArrayList<>(); statements.add(dataverseDecl); statements.add(subscribeStmt); - IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider); + IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider, + storageComponentProvider); translator.compileAndExecute(AppContextInfo.INSTANCE.getHcc(), null, QueryTranslator.ResultDelivery.IMMEDIATE); if (LOGGER.isEnabledFor(Level.INFO)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index b1ca062..b114e8c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -29,26 +29,26 @@ import java.util.logging.Logger; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.BuildProperties; +import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.CompilerProperties; -import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.FeedProperties; +import org.apache.asterix.common.config.MessagingProperties; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.config.PropertiesAccessor; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.config.MessagingProperties; -import org.apache.asterix.common.context.FileMapManager; import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.context.FileMapManager; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; @@ -62,6 +62,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.external.library.ExternalLibraryManager; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -74,7 +75,6 @@ import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; -import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; @@ -99,13 +99,12 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; import org.apache.hyracks.storage.common.file.IResourceIdFactory; -public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvider { +public class NCAppRuntimeContext implements IAppRuntimeContext { private static final Logger LOGGER = Logger.getLogger(NCAppRuntimeContext.class.getName()); private ILSMMergePolicyFactory metadataMergePolicyFactory; private final INCApplicationContext ncApplicationContext; private final IResourceIdFactory resourceIdFactory; - private CompilerProperties compilerProperties; private ExternalProperties externalProperties; private MetadataProperties metadataProperties; @@ -115,7 +114,6 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi private BuildProperties buildProperties; private ReplicationProperties replicationProperties; private MessagingProperties messagingProperties; - private ThreadExecutor threadExecutor; private IDatasetLifecycleManager datasetLifecycleManager; private IFileMapManager fileMapManager; @@ -136,14 +134,14 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi private final ILibraryManager libraryManager; private final NCExtensionManager ncExtensionManager; + private final IStorageComponentProvider componentProvider; public NCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions) - throws AsterixException, InstantiationException, IllegalAccessException, - ClassNotFoundException, IOException { + throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, + IOException { List<AsterixExtension> allExtensions = new ArrayList<>(); this.ncApplicationContext = ncApplicationContext; - PropertiesAccessor propertiesAccessor = - PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig()); + PropertiesAccessor propertiesAccessor = PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig()); compilerProperties = new CompilerProperties(propertiesAccessor); externalProperties = new ExternalProperties(propertiesAccessor); metadataProperties = new MetadataProperties(propertiesAccessor); @@ -159,6 +157,7 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi } allExtensions.addAll(new ExtensionProperties(propertiesAccessor).getExtensions()); ncExtensionManager = new NCExtensionManager(allExtensions); + componentProvider = new StorageComponentProvider(); resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory(); } @@ -181,16 +180,15 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi metadataMergePolicyFactory = new PrefixMergePolicyFactory(); ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = - new PersistentLocalResourceRepositoryFactory( - ioManager, ncApplicationContext.getNodeId(), metadataProperties); + new PersistentLocalResourceRepositoryFactory(ioManager, ncApplicationContext.getNodeId(), + metadataProperties); - localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory - .createRepository(); + localResourceRepository = + (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository(); - IAppRuntimeContextProvider asterixAppRuntimeContextProvider = - new AppRuntimeContextProviderForRecovery(this); - txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, - txnProperties); + IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AppRuntimeContextProviderForRecovery(this); + txnSubsystem = new TransactionSubsystem(ncApplicationContext, ncApplicationContext.getNodeId(), + asterixAppRuntimeContextProvider, txnProperties); IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager(); SystemState systemState = recoveryMgr.getSystemState(); @@ -448,9 +446,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi // This way we can delay the registration of the metadataNode until // it is completely initialized. MetadataManager.initialize(proxy, MetadataNode.INSTANCE); - MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse); + MetadataBootstrap.startUniverse(ncApplicationContext, newUniverse); MetadataBootstrap.startDDLRecovery(); - ncExtensionManager.initializeMetadata(); + ncExtensionManager.initializeMetadata(ncApplicationContext); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Metadata node bound"); @@ -473,4 +471,9 @@ public class NCAppRuntimeContext implements IAppRuntimeContext, IPropertiesProvi return ncExtensionManager; } + @Override + public IStorageComponentProvider getStorageComponentProvider() { + return componentProvider; + } + }
