http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index bb65b74..6fbf2a5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -25,13 +25,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.rmi.RemoteException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Date; -import java.util.Deque; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -47,22 +44,23 @@ import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.algebra.extension.IExtensionStatement; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.app.external.ExternalIndexingOperations; import org.apache.asterix.app.external.FeedJoint; -import org.apache.asterix.app.external.FeedOperations; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.result.ResultUtil; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; +import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber; @@ -81,10 +79,9 @@ import org.apache.asterix.external.feed.watch.FeedActivityDetails; import org.apache.asterix.external.feed.watch.FeedConnectJobInfo; import org.apache.asterix.external.feed.watch.FeedIntakeInfo; import org.apache.asterix.external.indexing.ExternalFile; +import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.asterix.file.DatasetOperations; -import org.apache.asterix.file.DataverseOperations; -import org.apache.asterix.file.IndexOperations; +import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement; import org.apache.asterix.lang.common.base.IReturningStatement; @@ -132,7 +129,6 @@ import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; -import org.apache.asterix.metadata.api.IMetadataEntity; import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; import org.apache.asterix.metadata.dataset.hints.DatasetHints; import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint; @@ -151,26 +147,25 @@ import org.apache.asterix.metadata.entities.InternalDatasetDetails; import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.asterix.metadata.feeds.FeedMetadataUtil; -import org.apache.asterix.metadata.utils.DatasetUtils; +import org.apache.asterix.metadata.feeds.FeedOperations; +import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry; -import org.apache.asterix.metadata.utils.KeyFieldTypeUtils; +import org.apache.asterix.metadata.utils.ExternalIndexingOperations; +import org.apache.asterix.metadata.utils.IndexUtil; +import org.apache.asterix.metadata.utils.KeyFieldTypeUtil; import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.MetadataLockManager; +import org.apache.asterix.metadata.utils.MetadataUtil; +import org.apache.asterix.metadata.utils.TypeUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; -import org.apache.asterix.om.types.hierachy.ATypeHierarchy; -import org.apache.asterix.runtime.util.AppContextInfo; +import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement; -import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement; @@ -180,9 +175,8 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.TypeTranslator; import org.apache.asterix.translator.util.ValidateUtil; -import org.apache.asterix.util.FlushDatasetUtils; -import org.apache.asterix.util.JobUtils; -import org.apache.commons.lang3.ArrayUtils; +import org.apache.asterix.utils.DataverseUtil; +import org.apache.asterix.utils.FlushDatasetUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; @@ -216,27 +210,24 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private static final Logger LOGGER = Logger.getLogger(QueryTranslator.class.getName()); - protected enum ProgressState { - NO_PROGRESS, - ADDED_PENDINGOP_RECORD_TO_METADATA - } - public static final boolean IS_DEBUG_MODE = false;// true protected final List<Statement> statements; protected final SessionConfig sessionConfig; - protected Dataverse activeDefaultDataverse; + protected Dataverse activeDataverse; protected final List<FunctionDecl> declaredFunctions; protected final APIFramework apiFramework; protected final IRewriterFactory rewriterFactory; + protected final IStorageComponentProvider componentProvider; public QueryTranslator(List<Statement> statements, SessionConfig conf, - ILangCompilationProvider compliationProvider) { + ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider) { this.statements = statements; this.sessionConfig = conf; - this.declaredFunctions = getDeclaredFunctions(statements); - this.apiFramework = new APIFramework(compliationProvider); - this.rewriterFactory = compliationProvider.getRewriterFactory(); - activeDefaultDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; + this.componentProvider = componentProvider; + declaredFunctions = getDeclaredFunctions(statements); + apiFramework = new APIFramework(compliationProvider); + rewriterFactory = compliationProvider.getRewriterFactory(); + activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE; } protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements) { @@ -286,9 +277,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (sessionConfig.is(SessionConfig.FORMAT_HTML)) { sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR); } - validateOperation(activeDefaultDataverse, stmt); + validateOperation(activeDataverse, stmt); rewriteStatement(stmt); // Rewrite the statement's AST. - MetadataProvider metadataProvider = new MetadataProvider(activeDefaultDataverse); + MetadataProvider metadataProvider = new MetadataProvider(activeDataverse, componentProvider); metadataProvider.setWriterFactory(writerFactory); metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider); metadataProvider.setOutputFile(outputFile); @@ -298,7 +289,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen handleSetStatement(stmt, config); break; case Statement.Kind.DATAVERSE_DECL: - activeDefaultDataverse = handleUseDataverseStatement(metadataProvider, stmt); + activeDataverse = handleUseDataverseStatement(metadataProvider, stmt); break; case Statement.Kind.CREATE_DATAVERSE: handleCreateDataverseStatement(metadataProvider, stmt); @@ -375,8 +366,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case Statement.Kind.QUERY: metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); - metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC - || resultDelivery == ResultDelivery.DEFERRED); + metadataProvider.setResultAsyncMode( + resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats); break; case Statement.Kind.COMPACT: @@ -450,8 +441,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleCreateDataverseStatement(MetadataProvider metadataProvider, Statement stmt) - throws Exception { + protected void handleCreateDataverseStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; String dvName = stmtCreateDataverse.getDataverseName().getValue(); @@ -470,7 +460,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), - new Dataverse(dvName, stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP)); + new Dataverse(dvName, stmtCreateDataverse.getFormat(), MetadataUtil.PENDING_NO_OP)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { abort(e, e, mdTxnCtx); @@ -488,8 +478,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new CompilationException("Unknown compaction policy: " + compactionPolicy); } String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName(); - ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class - .forName(compactionPolicyFactoryClassName).newInstance(); + ILSMMergePolicyFactory mergePolicyFactory = + (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { throw new CompilationException("The correlated-prefix merge policy cannot be used with external dataset."); } @@ -538,6 +528,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy); Dataset dataset = null; + Index primaryIndex = null; try { IDatasetDetails datasetDetails = null; @@ -556,8 +547,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (dt == null) { throw new AlgebricksException(": type " + itemTypeName + " could not be found."); } - String ngName = ngNameId != null ? ngNameId.getValue() - : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx); + String ngName = + ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx); if (compactionPolicy == null) { compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME; @@ -581,10 +572,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } ARecordType metaRecType = (ARecordType) metaItemType; - List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) - .getPartitioningExprs(); - List<Integer> keySourceIndicators = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) - .getKeySourceIndicators(); + List<List<String>> partitioningExprs = + ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); + List<Integer> keySourceIndicators = + ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators(); boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated(); ARecordType aRecordType = (ARecordType) itemType; List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType, @@ -607,10 +598,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case EXTERNAL: String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); - Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); + Map<String, String> properties = + ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); - datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), - ExternalDatasetTransactionState.COMMIT); + datasetDetails = + new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT); break; default: throw new CompilationException("Unknown datatype " + dd.getDatasetType()); @@ -625,14 +617,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties, datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(), - IMetadataEntity.PENDING_ADD_OP); + MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); - + primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName); if (dd.getDatasetType() == DatasetType.INTERNAL) { - Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), - dataverseName); - JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName, - metadataProvider); + Dataverse dataverse = + MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); + JobSpecification jobSpec = DatasetUtil.createDatasetJobSpec(dataverse, datasetName, metadataProvider); // #. make metadataTxn commit before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -650,7 +641,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); - dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP); + dataset.setPendingOp(MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { @@ -668,9 +659,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); try { - JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); + JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, primaryIndex, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; JobUtils.runJob(hcc, jobSpec, true); @@ -735,8 +725,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx) - throws CompilationException { + protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, + MetadataTransactionContext mdTxnCtx) throws CompilationException { int nodegroupCardinality; String nodegroupName; String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME); @@ -802,11 +792,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen JobSpecification spec = null; Dataset ds = null; // For external datasets - ArrayList<ExternalFile> externalFilesSnapshot = null; + List<ExternalFile> externalFilesSnapshot = null; boolean firstExternalDatasetIndex = false; boolean filesIndexReplicated = false; Index filesIndex = null; boolean datasetLocked = false; + Index index = null; try { ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); @@ -816,7 +807,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } indexName = stmtCreateIndex.getIndexName().getValue(); - Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, + index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, indexName); Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), ds.getItemTypeDataverseName(), ds.getItemTypeName()); @@ -833,8 +824,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen int keyIndex = 0; for (Pair<List<String>, TypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) { IAType fieldType = null; - ARecordType subType = KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType, - metaRecordType); + ARecordType subType = + KeyFieldTypeUtil.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType); boolean isOpen = subType.isOpen(); int i = 0; if (fieldExpr.first.size() > 1 && !isOpen) { @@ -858,8 +849,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (stmtCreateIndex.hasMetaField()) { throw new AlgebricksException("Typed open index can only be created on the record part"); } - Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second, - indexName, dataverseName); + Map<TypeSignature, IAType> typeMap = + TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second, indexName, dataverseName); TypeSignature typeSignature = new TypeSignature(dataverseName, indexName); fieldType = typeMap.get(typeSignature); } @@ -876,7 +867,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ValidateUtil.validateKeyFields(aRecordType, metaRecordType, indexFields, keySourceIndicators, indexFieldTypes, stmtCreateIndex.getIndexType()); - if (idx != null) { + if (index != null) { if (stmtCreateIndex.getIfNotExists()) { MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); return; @@ -893,7 +884,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { - List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(ds); + List<List<String>> partitioningKeys = DatasetUtil.getPartitioningKeys(ds); for (List<String> partitioningKey : partitioningKeys) { IAType keyType = aRecordType.getSubFieldType(partitioningKey); ITypeTraits typeTrait = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); @@ -925,7 +916,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Check if the files index exist filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); + datasetName, IndexingConstants.getFilesIndexName(datasetName)); firstExternalDatasetIndex = filesIndex == null; // Lock external dataset ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex); @@ -933,7 +924,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (firstExternalDatasetIndex) { // Verify that no one has created an index before we acquire the lock filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), - dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); + dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName)); if (filesIndex != null) { ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex); firstExternalDatasetIndex = false; @@ -944,11 +935,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Get snapshot from External File System externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds); // Add an entry for the files index - filesIndex = new Index(dataverseName, datasetName, - ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE, - ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, - ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, - IMetadataEntity.PENDING_ADD_OP); + filesIndex = + new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName), + IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null, + ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false, + MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); // Add files to the external files index for (ExternalFile file : externalFilesSnapshot) { @@ -970,33 +961,35 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (stmtCreateIndex.isEnforced()) { List<Index> indexes = MetadataManager.INSTANCE .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); - for (Index index : indexes) { - if (index.getKeyFieldNames().equals(indexFields) - && !index.getKeyFieldTypes().equals(indexFieldTypes) && index.isEnforcingKeyFileds()) { + for (Index existingIndex : indexes) { + if (existingIndex.getKeyFieldNames().equals(indexFields) + && !existingIndex.getKeyFieldTypes().equals(indexFieldTypes) + && existingIndex.isEnforcingKeyFileds()) { throw new CompilationException("Cannot create index " + indexName + " , enforced index " - + index.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',') - + "\" is already defined with type \"" + index.getKeyFieldTypes() + "\""); + + existingIndex.getIndexName() + " on field \"" + StringUtils.join(indexFields, ',') + + "\" is already defined with type \"" + existingIndex.getKeyFieldTypes() + "\""); } } } // #. add a new index with PendingAddOp - Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields, - keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(), - false, IMetadataEntity.PENDING_ADD_OP); + index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields, + keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), + stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); ARecordType enforcedType = null; + ARecordType enforcedMetaType = null; if (stmtCreateIndex.isEnforced()) { - enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index)); + Pair<ARecordType, ARecordType> enforcedTypes = + TypeUtil.createEnforcedType(aRecordType, metaRecordType, Lists.newArrayList(index)); + enforcedType = enforcedTypes.first; + enforcedMetaType = enforcedTypes.second; } // #. prepare to create the index artifact in NC. - CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, - index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType()); - spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, aRecordType, metaRecordType, - keySourceIndicators, enforcedType, metadataProvider); + spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, aRecordType, metaRecordType, enforcedType, + enforcedMetaType, metadataProvider); if (spec == null) { throw new CompilationException("Failed to create job spec for creating index '" + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'"); @@ -1014,12 +1007,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setMetadataTxnContext(mdTxnCtx); // #. load data into the index in NC. - cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(), - index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), - index.getGramLength(), index.getIndexType()); - - spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, aRecordType, metaRecordType, - keySourceIndicators, enforcedType, metadataProvider); + spec = IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, aRecordType, metaRecordType, enforcedType, + enforcedMetaType, metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -1033,14 +1022,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. add another new index with PendingNoOp after deleting the index with PendingAddOp MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, indexName); - index.setPendingOp(IMetadataEntity.PENDING_NO_OP); + index.setPendingOp(MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); // add another new files index with PendingNoOp after deleting the index with // PendingAddOp if (firstExternalDatasetIndex) { - MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, - filesIndex.getIndexName()); - filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP); + MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, + datasetName, filesIndex.getIndexName()); + filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP); MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex); // update transaction timestamp ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date()); @@ -1056,11 +1045,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (filesIndexReplicated) { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - ExternalIndexingOperations.getFilesIndexName(datasetName)); try { - JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, - metadataProvider, ds); + JobSpecification jobSpec = + ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; JobUtils.runJob(hcc, jobSpec, true); @@ -1078,11 +1065,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); try { - JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, - ds); - + JobSpecification jobSpec = IndexUtil.buildDropSecondaryIndexJobSpec(index, metadataProvider, ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; JobUtils.runJob(hcc, jobSpec, true); @@ -1111,13 +1095,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen try { // Drop the files index from metadata MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); + datasetName, IndexingConstants.getFilesIndexName(datasetName)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e2) { e.addSuppressed(e2); abort(e, e2, mdTxnCtx); throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName - + "." + datasetName + "." + ExternalIndexingOperations.getFilesIndexName(datasetName) + + "." + datasetName + "." + IndexingConstants.getFilesIndexName(datasetName) + ") couldn't be removed from the metadata", e); } } @@ -1219,8 +1203,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen disconnectFeedBeforeDelete(dvId, activeEntityId, conn, metadataProvider, hcc); } // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob( - MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); + jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE + .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName()))); } } @@ -1230,47 +1214,41 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String datasetName = datasets.get(j).getDatasetName(); DatasetType dsType = datasets.get(j).getDatasetType(); if (dsType == DatasetType.INTERNAL) { - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, - datasetName); + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); for (int k = 0; k < indexes.size(); k++) { if (indexes.get(k).isSecondaryIndex()) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, - datasets.get(j))); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), + metadataProvider, datasets.get(j))); } } - - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); - jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); + Index primaryIndex = + MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName); + jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), primaryIndex, metadataProvider)); } else { // External dataset - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, - datasetName); + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); for (int k = 0; k < indexes.size(); k++) { if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, - metadataProvider, datasets.get(j))); - } else { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(k).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, + jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, datasets.get(j))); + } else { + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), + metadataProvider, datasets.get(j))); } } ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j)); } } - jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider)); + jobsToExecute.add(DataverseUtil.dropDataverseJobSpec(dv, metadataProvider)); // #. mark PendingDropOp on the dataverse record by // first, deleting the dataverse record from the DATAVERSE_DATASET // second, inserting the dataverse record with the PendingDropOp value into the // DATAVERSE_DATASET MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); MetadataManager.INSTANCE.addDataverse(mdTxnCtx, - new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP)); + new Dataverse(dataverseName, dv.getDataFormat(), MetadataUtil.PENDING_DROP_OP)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -1286,8 +1264,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. finally, delete the dataverse. MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName); - if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) { - activeDefaultDataverse = null; + if (activeDataverse != null && activeDataverse.getDataverseName() == dataverseName) { + activeDataverse = null; } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { @@ -1296,8 +1274,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) { - if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) { - activeDefaultDataverse = null; + if (activeDataverse != null && activeDataverse.getDataverseName() == dataverseName) { + activeDataverse = null; } // #. execute compensation operations @@ -1327,6 +1305,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } finally { MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName); + ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -1359,8 +1338,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); String datasetName = stmtDelete.getDatasetName().getValue(); MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); - MutableObject<MetadataTransactionContext> mdTxnCtx = new MutableObject<>( - MetadataManager.INSTANCE.beginTransaction()); + MutableObject<MetadataTransactionContext> mdTxnCtx = + new MutableObject<>(MetadataManager.INSTANCE.beginTransaction()); MutableBoolean bActiveTxn = new MutableBoolean(true); metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName); @@ -1372,13 +1351,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); return; } else { - throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " - + dataverseName + "."); + throw new AlgebricksException("There is no dataset with this name " + datasetName + + " in dataverse " + dataverseName + "."); } } - - doDropDataset(ds, datasetName, metadataProvider, mdTxnCtx, jobsToExecute, dataverseName, bActiveTxn, - progress, hcc); + ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); } catch (Exception e) { if (bActiveTxn.booleanValue()) { @@ -1415,6 +1392,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } finally { MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName); + ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -1434,25 +1412,23 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } // #. prepare jobs to drop the datatset and the indexes in NC - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, - datasetName); + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); for (int j = 0; j < indexes.size(); j++) { if (indexes.get(j).isSecondaryIndex()) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds)); } } - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); - jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); - + Index primaryIndex = + MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), dataverseName, datasetName, datasetName); + jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, metadataProvider)); // #. mark the existing dataset as PendingDropOp MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName); MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), - ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP)); + ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); bActiveTxn.setValue(false); @@ -1475,17 +1451,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // External dataset ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); // #. prepare jobs to drop the datatset and the indexes in NC - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, - datasetName); + List<Index> indexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName); for (int j = 0; j < indexes.size(); j++) { if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds)); } else { - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, - indexes.get(j).getIndexName()); - jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds)); } } @@ -1495,7 +1467,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(), ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), - IMetadataEntity.PENDING_DROP_OP)); + MetadataUtil.PENDING_DROP_OP)); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue()); bActiveTxn.setValue(false); @@ -1573,15 +1545,15 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } // #. prepare a job to drop the index in NC. - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(index, metadataProvider, ds)); // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), - index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); + new Index(dataverseName, datasetName, indexName, index.getIndexType(), + index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), + index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(), + MetadataUtil.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1614,19 +1586,16 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException("Dropping a dataset's files index is not allowed."); } // #. prepare a job to drop the index in NC. - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); - jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds)); - List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, - datasetName); + jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(index, metadataProvider, ds)); + List<Index> datasetIndexes = + MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); if (datasetIndexes.size() == 2) { dropFilesIndex = true; // only one index + the files index, we need to delete both of the indexes for (Index externalIndex : datasetIndexes) { if (ExternalIndexingOperations.isFileIndex(externalIndex)) { - cds = new CompiledIndexDropStatement(dataverseName, datasetName, - externalIndex.getIndexName()); - jobsToExecute.add( - ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds)); + jobsToExecute + .add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds)); // #. mark PendingDropOp on the existing files index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, externalIndex.getIndexName()); @@ -1635,7 +1604,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen externalIndex.getIndexType(), externalIndex.getKeyFieldNames(), externalIndex.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), externalIndex.isPrimaryIndex(), - IMetadataEntity.PENDING_DROP_OP)); + MetadataUtil.PENDING_DROP_OP)); } } } @@ -1643,9 +1612,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // #. mark PendingDropOp on the existing index MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); MetadataManager.INSTANCE.addIndex(mdTxnCtx, - new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), - index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(), - index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); + new Index(dataverseName, datasetName, indexName, index.getIndexType(), + index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(), + index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(), + MetadataUtil.PENDING_DROP_OP)); // #. commit the existing transaction before calling runJob. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -1666,7 +1636,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (dropFilesIndex) { // delete the files index too MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, - ExternalIndexingOperations.getFilesIndexName(datasetName)); + IndexingConstants.getFilesIndexName(datasetName)); MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds); ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds); } @@ -1698,14 +1668,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen datasetName, indexName); if (dropFilesIndex) { MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, - datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName)); + datasetName, IndexingConstants.getFilesIndexName(datasetName)); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e2) { e.addSuppressed(e2); abort(e, e2, mdTxnCtx); - throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "." - + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); + throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); } } @@ -1713,6 +1683,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } finally { MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, dataverseName + "." + datasetName); + ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } @@ -1769,8 +1740,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt) - throws Exception { + protected void handleCreateFunctionStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace()); cfs.getSignature().setNamespace(dataverse); @@ -1826,8 +1796,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { + protected void handleLoadStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) + throws Exception { LoadStatement loadStmt = (LoadStatement) stmt; String dataverseName = getActiveDataverse(loadStmt.getDataverseName()); String datasetName = loadStmt.getDatasetName().getValue(); @@ -1836,11 +1806,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setMetadataTxnContext(mdTxnCtx); MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName); try { - CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, - loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), - loadStmt.dataIsAlreadySorted()); - JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, - cls); + CompiledLoadFromFileStatement cls = + new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(), + loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); + JobSpecification spec = + apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null) { @@ -1894,8 +1864,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, - IHyracksClientConnection hcc) throws Exception { + public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) + throws Exception { DeleteStatement stmtDelete = (DeleteStatement) stmt; String dataverseName = getActiveDataverse(stmtDelete.getDataverseName()); @@ -1934,13 +1904,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen @Override public JobSpecification rewriteCompileQuery(IClusterInfoCollector clusterInfoCollector, - MetadataProvider metadataProvider, Query query, - ICompiledDmlStatement stmt) + MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt) throws RemoteException, AlgebricksException, ACIDException { // Query Rewriting (happens under the same ongoing metadata transaction) - Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, - metadataProvider, query, sessionConfig); + Pair<IReturningStatement, Integer> rewrittenResult = + apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionConfig); // Query Compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, @@ -1952,8 +1921,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throws RemoteException, AlgebricksException, ACIDException { // Insert/upsert statement rewriting (happens under the same ongoing metadata transaction) - Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, - metadataProvider, insertUpsert, sessionConfig); + Pair<IReturningStatement, Integer> rewrittenResult = + apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionConfig); InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first; String dataverseName = getActiveDataverse(rewrittenInsertUpsert.getDataverseName()); @@ -2049,8 +2018,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen boolean extendingExisting = cfps.getSourcePolicyName() != null; String description = cfps.getDescription() == null ? "" : cfps.getDescription(); if (extendingExisting) { - FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE - .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); + FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy( + metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); @@ -2105,8 +2074,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(feedId); + FeedEventsListener listener = + (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId); if (listener != null) { StringBuilder builder = new StringBuilder(); for (FeedConnectionId connectionId : listener.getConnections()) { @@ -2134,8 +2103,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleDropFeedPolicyStatement(MetadataProvider metadataProvider, Statement stmt) - throws Exception { + protected void handleDropFeedPolicyStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt; @@ -2176,8 +2144,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); FeedConnectionId feedConnId = null; EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName()); - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(entityId); + FeedEventsListener listener = + (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName + "." + feedName); try { @@ -2196,26 +2164,26 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new CompilationException("Feed " + cfs.getFeedName() + " is already connected to dataset " + cfs.getDatasetName().getValue()); } - FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), - mdTxnCtx); + FeedPolicyEntity feedPolicy = + FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(), mdTxnCtx); // All Metadata checks have passed. Feed connect request is valid. // if (listener == null) { listener = new FeedEventsListener(entityId); ActiveJobNotificationHandler.INSTANCE.registerListener(listener); } FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties()); - Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName, - feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx); + Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = + getFeedConnectionRequest(dataverseName, feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx); FeedConnectionRequest connectionRequest = triple.first; boolean createFeedIntakeJob = triple.second; listener.registerFeedEventSubscriber(eventSubscriber); subscriberRegistered = true; if (createFeedIntakeJob) { EntityId feedId = connectionRequest.getFeedJointKey().getFeedId(); - Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), - feedId.getEntityName()); - Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed, - metadataProvider, policyAccessor); + Feed primaryFeed = + MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()); + Pair<JobSpecification, IAdapterFactory> pair = + FeedOperations.buildFeedIntakeJobSpec(primaryFeed, metadataProvider, policyAccessor); // adapter configuration are valid at this stage // register the feed joints (these are auto-de-registered) int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length; @@ -2276,8 +2244,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen FeedRuntimeType connectionLocation; FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx); EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverse, feed.getFeedName()); - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(entityId); + FeedEventsListener listener = + (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); if (listener == null) { throw new CompilationException("Feed Listener is not registered"); } @@ -2339,8 +2307,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (sourceFeed.getAppliedFunction() != null) { appliedFunctions.add(0, sourceFeed.getAppliedFunction().getName()); } - Feed parentFeed = MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(), - sourceFeed.getSourceFeedName()); + Feed parentFeed = + MetadataManager.INSTANCE.getFeed(ctx, feed.getDataverseName(), sourceFeed.getSourceFeedName()); sourceFeed = parentFeed; } @@ -2365,12 +2333,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName()); FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue()); IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber(); - FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE - .getActiveEntityListener(entityId); + FeedEventsListener listener = + (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId); if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) { - throw new CompilationException( - "Feed " + feed.getFeedId().getEntityName() + " is currently not connected to " - + cfs.getDatasetName().getValue() + ". Invalid operation!"); + throw new CompilationException("Feed " + feed.getFeedId().getEntityName() + + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); } listener.registerFeedEventSubscriber(eventSubscriber); MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName, @@ -2382,8 +2349,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new CompilationException( "Unknown dataset :" + cfs.getDatasetName().getValue() + " in dataverse " + dataverseName); } - Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations - .buildDisconnectFeedJobSpec(metadataProvider, connectionId); + Pair<JobSpecification, Boolean> specDisconnectType = + FeedOperations.buildDisconnectFeedJobSpec(connectionId); JobSpecification jobSpec = specDisconnectType.first; MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -2415,8 +2382,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt; bfs.initialize(metadataProvider.getMetadataTxnContext()); - CompiledSubscribeFeedStatement csfs = new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), - bfs.getVarCounter()); + CompiledSubscribeFeedStatement csfs = + new CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), bfs.getVarCounter()); metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE); metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, "" + bfs.getPolicy()); metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS, @@ -2445,8 +2412,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (jobSpec != null) { FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId()); - FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(), - null, ActivityState.ACTIVE, + FeedConnectJobInfo activeJob = new FeedConnectJobInfo( + bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE, new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset), listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties()); alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob); @@ -2486,10 +2453,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ds.getItemTypeDataverseName(), itemTypeName); ARecordType metaRecordType = null; if (ds.hasMetaPart()) { - metaRecordType = (ARecordType) MetadataManager.INSTANCE - .getDatatype(metadataProvider.getMetadataTxnContext(), ds.getMetaItemTypeDataverseName(), - ds.getMetaItemTypeName()) - .getDatatype(); + metaRecordType = + (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), + ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName()).getDatatype(); } // Prepare jobs to compact the datatset and its indexes List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); @@ -2497,27 +2463,24 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException( "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes"); } - Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), - dataverseName); - jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); + Dataverse dataverse = + MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); + jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); ARecordType aRecordType = (ARecordType) dt.getDatatype(); - ARecordType enforcedType = createEnforcedType(aRecordType, indexes); + Pair<ARecordType, ARecordType> enforcedTypes = + TypeUtil.createEnforcedType(aRecordType, metaRecordType, indexes); + ARecordType enforcedType = enforcedTypes.first; + ARecordType enforcedMeta = enforcedTypes.second; if (ds.getDatasetType() == DatasetType.INTERNAL) { for (int j = 0; j < indexes.size(); j++) { if (indexes.get(j).isSecondaryIndex()) { - CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName, - datasetName, indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), - indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), - indexes.get(j).getGramLength(), indexes.get(j).getIndexType()); - List<Integer> keySourceIndicators = indexes.get(j).getKeyFieldSourceIndicators(); - - jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType, - metaRecordType, keySourceIndicators, enforcedType, metadataProvider)); + jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType, + metaRecordType, enforcedType, enforcedMeta, metadataProvider)); } } } else { - prepareCompactJobsForExternalDataset(indexes, dataverseName, datasetName, ds, jobsToExecute, - aRecordType, metaRecordType, metadataProvider, enforcedType); + prepareCompactJobsForExternalDataset(indexes, ds, jobsToExecute, aRecordType, metaRecordType, + metadataProvider, enforcedType, enforcedMeta); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -2533,28 +2496,23 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } finally { MetadataLockManager.INSTANCE.compactEnd(dataverseName, dataverseName + "." + datasetName); + ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } } - protected void prepareCompactJobsForExternalDataset(List<Index> indexes, String dataverseName, String datasetName, - Dataset ds, List<JobSpecification> jobsToExecute, ARecordType aRecordType, ARecordType metaRecordType, - MetadataProvider metadataProvider, ARecordType enforcedType) throws AlgebricksException { + protected void prepareCompactJobsForExternalDataset(List<Index> indexes, Dataset ds, + List<JobSpecification> jobsToExecute, ARecordType aRecordType, ARecordType metaRecordType, + MetadataProvider metadataProvider, ARecordType enforcedType, ARecordType enforcedMeta) + throws AlgebricksException { for (int j = 0; j < indexes.size(); j++) { if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) { - CompiledIndexCompactStatement cics = new CompiledIndexCompactStatement(dataverseName, datasetName, - indexes.get(j).getIndexName(), indexes.get(j).getKeyFieldNames(), - indexes.get(j).getKeyFieldTypes(), indexes.get(j).isEnforcingKeyFileds(), - indexes.get(j).getGramLength(), indexes.get(j).getIndexType()); - List<Integer> keySourceIndicators = null; - if (ds.hasMetaPart()) { - keySourceIndicators = indexes.get(j).getKeyFieldSourceIndicators(); - } - jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, aRecordType, metaRecordType, - keySourceIndicators, enforcedType, metadataProvider)); + jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, indexes.get(j), aRecordType, + metaRecordType, enforcedType, enforcedMeta, metadataProvider)); } } - jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider)); + jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider, + new StorageComponentProvider())); } protected JobSpecification handleQuery(MetadataProvider metadataProvider, Query query, @@ -2563,7 +2521,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets()); + MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets()); try { JobSpecification jobSpec = rewriteCompileQuery(hcc, metadataProvider, query, null); @@ -2598,8 +2556,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ResultHandle hand; switch (resultDelivery) { case ASYNC: - hand = new ResultHandle(jobId,metadataProvider.getResultSetId()); - ResultUtil.printResultHandle(hand,sessionConfig); + hand = new ResultHandle(jobId, metadataProvider.getResultSetId()); + ResultUtil.printResultHandle(hand, sessionConfig); hcc.waitForCompletion(jobId); sessionConfig.out().flush(); break; @@ -2611,8 +2569,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case DEFERRED: hcc.waitForCompletion(jobId); - hand = new ResultHandle(jobId,metadataProvider.getResultSetId()); - ResultUtil.printResultHandle(hand,sessionConfig); + hand = new ResultHandle(jobId, metadataProvider.getResultSetId()); + ResultUtil.printResultHandle(hand, sessionConfig); sessionConfig.out().flush(); break; default: @@ -2620,8 +2578,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - protected void handleCreateNodeGroupStatement(MetadataProvider metadataProvider, Statement stmt) - throws Exception { + protected void handleCreateNodeGroupStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception { NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); @@ -2658,7 +2615,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt; String dataverseName = getActiveDataverse(stmtRefresh.getDataverseName()); String datasetName = stmtRefresh.getDatasetName().getValue(); - ExternalDatasetTransactionState transactionState = ExternalDatasetTransactionState.COMMIT; + TransactionState transactionState = TransactionState.COMMIT; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, dataverseName + "." + datasetName); boolean bActiveTxn = true; @@ -2738,20 +2695,20 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } // Create the files index update job - spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles, - appendedFiles, metadataProvider); + spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, addedFiles, appendedFiles, + metadataProvider); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - transactionState = ExternalDatasetTransactionState.BEGIN; + transactionState = TransactionState.BEGIN; // run the files update job JobUtils.runJob(hcc, spec, true); for (Index index : indexes) { if (!ExternalIndexingOperations.isFileIndex(index)) { - spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles, - addedFiles, appendedFiles, metadataProvider); + spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles, + appendedFiles, metadataProvider); // run the files update job JobUtils.runJob(hcc, spec, true); } @@ -2765,12 +2722,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setMetadataTxnContext(mdTxnCtx); bActiveTxn = true; ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()) - .setState(ExternalDatasetTransactionState.READY_TO_COMMIT); + .setState(TransactionState.READY_TO_COMMIT); ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()).setRefreshTimestamp(txnTime); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT; + transactionState = TransactionState.READY_TO_COMMIT; // We don't release the latch since this job is expected to be quick JobUtils.runJob(hcc, spec, true); // Start a new metadata transaction to record the final state of the transaction @@ -2779,9 +2736,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen bActiveTxn = true; for (ExternalFile file : metadataFiles) { - if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) { + if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); - } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) { + } else if (file.getPendingOp() == ExternalFilePendingOp.NO_OP) { Iterator<ExternalFile> iterator = appendedFiles.iterator(); while (iterator.hasNext()) { ExternalFile appendedFile = iterator.next(); @@ -2792,7 +2749,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile); // add the original file with appended information appendedFile.setFileNumber(file.getFileNumber()); - appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); + appendedFile.setPendingOp(ExternalFilePendingOp.NO_OP); MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile); iterator.remove(); } @@ -2808,13 +2765,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // insert new files for (ExternalFile file : addedFiles) { MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file); - file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP); + file.setPendingOp(ExternalFilePendingOp.NO_OP); MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file); } // mark the transaction as complete - ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()) - .setState(ExternalDatasetTransactionState.COMMIT); + ((ExternalDatasetDetails) transactionDataset.getDatasetDetails()).setState(TransactionState.COMMIT); MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset); // commit metadata transaction @@ -2824,15 +2780,15 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (bActiveTxn) { abort(e, e, mdTxnCtx); } - if (transactionState == ExternalDatasetTransactionState.READY_TO_COMMIT) { + if (transactionState == TransactionState.READY_TO_COMMIT) { throw new IllegalStateException("System is inconsistent state: commit of (" + dataverseName + "." + datasetName + ") refresh couldn't carry out the commit phase", e); } - if (transactionState == ExternalDatasetTransactionState.COMMIT) { + if (transactionState == TransactionState.COMMIT) { // Nothing to do , everything should be clean th
<TRUNCATED>
