http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index bb65b74..6fbf2a5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -25,13 +25,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.rmi.RemoteException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -47,22 +44,23 @@ import 
org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.app.external.ExternalIndexingOperations;
 import org.apache.asterix.app.external.FeedJoint;
-import org.apache.asterix.app.external.FeedOperations;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.result.ResultUtil;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import 
org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
@@ -81,10 +79,9 @@ import 
org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.file.DatasetOperations;
-import org.apache.asterix.file.DataverseOperations;
-import org.apache.asterix.file.IndexOperations;
+import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
 import org.apache.asterix.lang.common.base.IReturningStatement;
@@ -132,7 +129,6 @@ import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import 
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
@@ -151,26 +147,25 @@ import 
org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.feeds.FeedOperations;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.metadata.utils.KeyFieldTypeUtils;
+import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
+import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataLockManager;
+import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.metadata.utils.TypeUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.runtime.util.AppContextInfo;
+import org.apache.asterix.runtime.utils.AppContextInfo;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
@@ -180,9 +175,8 @@ import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
-import org.apache.asterix.util.FlushDatasetUtils;
-import org.apache.asterix.util.JobUtils;
-import org.apache.commons.lang3.ArrayUtils;
+import org.apache.asterix.utils.DataverseUtil;
+import org.apache.asterix.utils.FlushDatasetUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -216,27 +210,24 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     private static final Logger LOGGER = 
Logger.getLogger(QueryTranslator.class.getName());
 
-    protected enum ProgressState {
-        NO_PROGRESS,
-        ADDED_PENDINGOP_RECORD_TO_METADATA
-    }
-
     public static final boolean IS_DEBUG_MODE = false;// true
     protected final List<Statement> statements;
     protected final SessionConfig sessionConfig;
-    protected Dataverse activeDefaultDataverse;
+    protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
+    protected final IStorageComponentProvider componentProvider;
 
     public QueryTranslator(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider) {
+            ILangCompilationProvider compliationProvider, 
IStorageComponentProvider componentProvider) {
         this.statements = statements;
         this.sessionConfig = conf;
-        this.declaredFunctions = getDeclaredFunctions(statements);
-        this.apiFramework = new APIFramework(compliationProvider);
-        this.rewriterFactory = compliationProvider.getRewriterFactory();
-        activeDefaultDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+        this.componentProvider = componentProvider;
+        declaredFunctions = getDeclaredFunctions(statements);
+        apiFramework = new APIFramework(compliationProvider);
+        rewriterFactory = compliationProvider.getRewriterFactory();
+        activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
     }
 
     protected List<FunctionDecl> getDeclaredFunctions(List<Statement> 
statements) {
@@ -286,9 +277,9 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
                     
sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
                 }
-                validateOperation(activeDefaultDataverse, stmt);
+                validateOperation(activeDataverse, stmt);
                 rewriteStatement(stmt); // Rewrite the statement's AST.
-                MetadataProvider metadataProvider = new 
MetadataProvider(activeDefaultDataverse);
+                MetadataProvider metadataProvider = new 
MetadataProvider(activeDataverse, componentProvider);
                 metadataProvider.setWriterFactory(writerFactory);
                 
metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
                 metadataProvider.setOutputFile(outputFile);
@@ -298,7 +289,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         handleSetStatement(stmt, config);
                         break;
                     case Statement.Kind.DATAVERSE_DECL:
-                        activeDefaultDataverse = 
handleUseDataverseStatement(metadataProvider, stmt);
+                        activeDataverse = 
handleUseDataverseStatement(metadataProvider, stmt);
                         break;
                     case Statement.Kind.CREATE_DATAVERSE:
                         handleCreateDataverseStatement(metadataProvider, stmt);
@@ -375,8 +366,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         break;
                     case Statement.Kind.QUERY:
                         metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
-                        metadataProvider.setResultAsyncMode(resultDelivery == 
ResultDelivery.ASYNC
-                                || resultDelivery == ResultDelivery.DEFERRED);
+                        metadataProvider.setResultAsyncMode(
+                                resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED);
                         handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, stats);
                         break;
                     case Statement.Kind.COMPACT:
@@ -450,8 +441,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         }
     }
 
-    protected void handleCreateDataverseStatement(MetadataProvider 
metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleCreateDataverseStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
 
         CreateDataverseStatement stmtCreateDataverse = 
(CreateDataverseStatement) stmt;
         String dvName = stmtCreateDataverse.getDataverseName().getValue();
@@ -470,7 +460,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 }
             }
             
MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
-                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), 
IMetadataEntity.PENDING_NO_OP));
+                    new Dataverse(dvName, stmtCreateDataverse.getFormat(), 
MetadataUtil.PENDING_NO_OP));
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -488,8 +478,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             throw new CompilationException("Unknown compaction policy: " + 
compactionPolicy);
         }
         String compactionPolicyFactoryClassName = 
compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) 
Class
-                .forName(compactionPolicyFactoryClassName).newInstance();
+        ILSMMergePolicyFactory mergePolicyFactory =
+                (ILSMMergePolicyFactory) 
Class.forName(compactionPolicyFactoryClassName).newInstance();
         if (isExternalDataset && 
mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
             throw new CompilationException("The correlated-prefix merge policy 
cannot be used with external dataset.");
         }
@@ -538,6 +528,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 metaItemTypeDataverseName + "." + metaItemTypeName, 
nodegroupName, compactionPolicy,
                 dataverseName + "." + datasetName, defaultCompactionPolicy);
         Dataset dataset = null;
+        Index primaryIndex = null;
         try {
 
             IDatasetDetails datasetDetails = null;
@@ -556,8 +547,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " 
could not be found.");
             }
-            String ngName = ngNameId != null ? ngNameId.getValue()
-                    : configureNodegroupForDataset(dd, dataverseName, 
mdTxnCtx);
+            String ngName =
+                    ngNameId != null ? ngNameId.getValue() : 
configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -581,10 +572,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     }
                     ARecordType metaRecType = (ARecordType) metaItemType;
 
-                    List<List<String>> partitioningExprs = 
((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getPartitioningExprs();
-                    List<Integer> keySourceIndicators = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl())
-                            .getKeySourceIndicators();
+                    List<List<String>> partitioningExprs =
+                            ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getPartitioningExprs();
+                    List<Integer> keySourceIndicators =
+                            ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getKeySourceIndicators();
                     boolean autogenerated = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).isAutogenerated();
                     ARecordType aRecordType = (ARecordType) itemType;
                     List<IAType> partitioningTypes = 
ValidateUtil.validatePartitioningExpressions(aRecordType,
@@ -607,10 +598,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getProperties();
+                    Map<String, String> properties =
+                            ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getProperties();
 
-                    datasetDetails = new ExternalDatasetDetails(adapter, 
properties, new Date(),
-                            ExternalDatasetTransactionState.COMMIT);
+                    datasetDetails =
+                            new ExternalDatasetDetails(adapter, properties, 
new Date(), TransactionState.COMMIT);
                     break;
                 default:
                     throw new CompilationException("Unknown datatype " + 
dd.getDatasetType());
@@ -625,14 +617,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             dataset = new Dataset(dataverseName, datasetName, 
itemTypeDataverseName, itemTypeName,
                     metaItemTypeDataverseName, metaItemTypeName, ngName, 
compactionPolicy, compactionPolicyProperties,
                     datasetDetails, dd.getHints(), dsType, 
DatasetIdFactory.generateDatasetId(),
-                    IMetadataEntity.PENDING_ADD_OP);
+                    MetadataUtil.PENDING_ADD_OP);
             
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), 
dataset);
-
+            primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataverseName, datasetName, datasetName);
             if (dd.getDatasetType() == DatasetType.INTERNAL) {
-                Dataverse dataverse = 
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                        dataverseName);
-                JobSpecification jobSpec = 
DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
-                        metadataProvider);
+                Dataverse dataverse =
+                        
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dataverseName);
+                JobSpecification jobSpec = 
DatasetUtil.createDatasetJobSpec(dataverse, datasetName, metadataProvider);
 
                 // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -650,7 +641,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
 
             // #. add a new dataset with PendingNoOp after deleting the 
dataset with PendingAddOp
             
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName);
-            dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            dataset.setPendingOp(MetadataUtil.PENDING_NO_OP);
             
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), 
dataset);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -668,9 +659,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledDatasetDropStatement cds = new 
CompiledDatasetDropStatement(dataverseName, datasetName);
                 try {
-                    JobSpecification jobSpec = 
DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
+                    JobSpecification jobSpec = 
DatasetUtil.dropDatasetJobSpec(dataset, primaryIndex, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     JobUtils.runJob(hcc, jobSpec, true);
@@ -735,8 +725,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String 
dataverse, MetadataTransactionContext mdTxnCtx)
-            throws CompilationException {
+    protected String configureNodegroupForDataset(DatasetDecl dd, String 
dataverse,
+            MetadataTransactionContext mdTxnCtx) throws CompilationException {
         int nodegroupCardinality;
         String nodegroupName;
         String hintValue = 
dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
@@ -802,11 +792,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         JobSpecification spec = null;
         Dataset ds = null;
         // For external datasets
-        ArrayList<ExternalFile> externalFilesSnapshot = null;
+        List<ExternalFile> externalFilesSnapshot = null;
         boolean firstExternalDatasetIndex = false;
         boolean filesIndexReplicated = false;
         Index filesIndex = null;
         boolean datasetLocked = false;
+        Index index = null;
         try {
             ds = 
MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                     datasetName);
@@ -816,7 +807,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             }
 
             indexName = stmtCreateIndex.getIndexName().getValue();
-            Index idx = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
+            index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                     datasetName, indexName);
             Datatype dt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), ds.getItemTypeName());
@@ -833,8 +824,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             int keyIndex = 0;
             for (Pair<List<String>, TypeExpression> fieldExpr : 
stmtCreateIndex.getFieldExprs()) {
                 IAType fieldType = null;
-                ARecordType subType = 
KeyFieldTypeUtils.chooseSource(keySourceIndicators, keyIndex, aRecordType,
-                        metaRecordType);
+                ARecordType subType =
+                        KeyFieldTypeUtil.chooseSource(keySourceIndicators, 
keyIndex, aRecordType, metaRecordType);
                 boolean isOpen = subType.isOpen();
                 int i = 0;
                 if (fieldExpr.first.size() > 1 && !isOpen) {
@@ -858,8 +849,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                     if (stmtCreateIndex.hasMetaField()) {
                         throw new AlgebricksException("Typed open index can 
only be created on the record part");
                     }
-                    Map<TypeSignature, IAType> typeMap = 
TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second,
-                            indexName, dataverseName);
+                    Map<TypeSignature, IAType> typeMap =
+                            TypeTranslator.computeTypes(mdTxnCtx, 
fieldExpr.second, indexName, dataverseName);
                     TypeSignature typeSignature = new 
TypeSignature(dataverseName, indexName);
                     fieldType = typeMap.get(typeSignature);
                 }
@@ -876,7 +867,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             ValidateUtil.validateKeyFields(aRecordType, metaRecordType, 
indexFields, keySourceIndicators,
                     indexFieldTypes, stmtCreateIndex.getIndexType());
 
-            if (idx != null) {
+            if (index != null) {
                 if (stmtCreateIndex.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     return;
@@ -893,7 +884,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                     || stmtCreateIndex.getIndexType() == 
IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == 
IndexType.LENGTH_PARTITIONED_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == 
IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) {
-                List<List<String>> partitioningKeys = 
DatasetUtils.getPartitioningKeys(ds);
+                List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(ds);
                 for (List<String> partitioningKey : partitioningKeys) {
                     IAType keyType = 
aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
@@ -925,7 +916,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
 
                 // Check if the files index exist
                 filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                        datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
+                        datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = filesIndex == null;
                 // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, 
firstExternalDatasetIndex);
@@ -933,7 +924,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 if (firstExternalDatasetIndex) {
                     // Verify that no one has created an index before we 
acquire the lock
                     filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            dataverseName, datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
+                            dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
                     if (filesIndex != null) {
                         ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, 
firstExternalDatasetIndex);
                         firstExternalDatasetIndex = false;
@@ -944,11 +935,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     // Get snapshot from External File System
                     externalFilesSnapshot = 
ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
                     // Add an entry for the files index
-                    filesIndex = new Index(dataverseName, datasetName,
-                            
ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, 
null,
-                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, 
false, false,
-                            IMetadataEntity.PENDING_ADD_OP);
+                    filesIndex =
+                            new Index(dataverseName, datasetName, 
IndexingConstants.getFilesIndexName(datasetName),
+                                    IndexType.BTREE, 
ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+                                    
ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
+                                    MetadataUtil.PENDING_ADD_OP);
                     
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                     // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
@@ -970,33 +961,35 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (stmtCreateIndex.isEnforced()) {
                 List<Index> indexes = MetadataManager.INSTANCE
                         
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName);
-                for (Index index : indexes) {
-                    if (index.getKeyFieldNames().equals(indexFields)
-                            && 
!index.getKeyFieldTypes().equals(indexFieldTypes) && 
index.isEnforcingKeyFileds()) {
+                for (Index existingIndex : indexes) {
+                    if (existingIndex.getKeyFieldNames().equals(indexFields)
+                            && 
!existingIndex.getKeyFieldTypes().equals(indexFieldTypes)
+                            && existingIndex.isEnforcingKeyFileds()) {
                         throw new CompilationException("Cannot create index " 
+ indexName + " , enforced index "
-                                + index.getIndexName() + " on field \"" + 
StringUtils.join(indexFields, ',')
-                                + "\" is already defined with type \"" + 
index.getKeyFieldTypes() + "\"");
+                                + existingIndex.getIndexName() + " on field 
\"" + StringUtils.join(indexFields, ',')
+                                + "\" is already defined with type \"" + 
existingIndex.getKeyFieldTypes() + "\"");
                     }
                 }
             }
 
             // #. add a new index with PendingAddOp
-            Index index = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
-                    false, IMetadataEntity.PENDING_ADD_OP);
+            index = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
+                    keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(),
+                    stmtCreateIndex.isEnforced(), false, 
MetadataUtil.PENDING_ADD_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
 
             ARecordType enforcedType = null;
+            ARecordType enforcedMetaType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = createEnforcedType(aRecordType, 
Lists.newArrayList(index));
+                Pair<ARecordType, ARecordType> enforcedTypes =
+                        TypeUtil.createEnforcedType(aRecordType, 
metaRecordType, Lists.newArrayList(index));
+                enforcedType = enforcedTypes.first;
+                enforcedMetaType = enforcedTypes.second;
             }
 
             // #. prepare to create the index artifact in NC.
-            CompiledCreateIndexStatement cis = new 
CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-                    index.getDatasetName(), index.getKeyFieldNames(), 
index.getKeyFieldTypes(),
-                    index.isEnforcingKeyFileds(), index.getGramLength(), 
index.getIndexType());
-            spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, 
aRecordType, metaRecordType,
-                    keySourceIndicators, enforcedType, metadataProvider);
+            spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, 
aRecordType, metaRecordType, enforcedType,
+                    enforcedMetaType, metadataProvider);
             if (spec == null) {
                 throw new CompilationException("Failed to create job spec for 
creating index '"
                         + stmtCreateIndex.getDatasetName() + "." + 
stmtCreateIndex.getIndexName() + "'");
@@ -1014,12 +1007,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. load data into the index in NC.
-            cis = new CompiledCreateIndexStatement(index.getIndexName(), 
dataverseName, index.getDatasetName(),
-                    index.getKeyFieldNames(), index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(),
-                    index.getGramLength(), index.getIndexType());
-
-            spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, 
aRecordType, metaRecordType,
-                    keySourceIndicators, enforcedType, metadataProvider);
+            spec = IndexUtil.buildSecondaryIndexLoadingJobSpec(ds, index, 
aRecordType, metaRecordType, enforcedType,
+                    enforcedMetaType, metadataProvider);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
@@ -1033,14 +1022,14 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // #. add another new index with PendingNoOp after deleting the 
index with PendingAddOp
             
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
                     indexName);
-            index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+            index.setPendingOp(MetadataUtil.PENDING_NO_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
             // add another new files index with PendingNoOp after deleting the 
index with
             // PendingAddOp
             if (firstExternalDatasetIndex) {
-                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
-                        filesIndex.getIndexName());
-                filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
+                
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
+                        datasetName, filesIndex.getIndexName());
+                filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
                 
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
                 // update transaction timestamp
                 ((ExternalDatasetDetails) 
ds.getDatasetDetails()).setRefreshTimestamp(new Date());
@@ -1056,11 +1045,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
-                CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                        
ExternalIndexingOperations.getFilesIndexName(datasetName));
                 try {
-                    JobSpecification jobSpec = 
ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                            metadataProvider, ds);
+                    JobSpecification jobSpec =
+                            
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     JobUtils.runJob(hcc, jobSpec, true);
@@ -1078,11 +1065,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 try {
-                    JobSpecification jobSpec = 
IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider,
-                            ds);
-
+                    JobSpecification jobSpec = 
IndexUtil.buildDropSecondaryIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     JobUtils.runJob(hcc, jobSpec, true);
@@ -1111,13 +1095,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     try {
                         // Drop the files index from metadata
                         
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                                datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
+                                datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
                         throw new IllegalStateException("System is 
inconsistent state: pending index(" + dataverseName
-                                + "." + datasetName + "." + 
ExternalIndexingOperations.getFilesIndexName(datasetName)
+                                + "." + datasetName + "." + 
IndexingConstants.getFilesIndexName(datasetName)
                                 + ") couldn't be removed from the metadata", 
e);
                     }
                 }
@@ -1219,8 +1203,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                         disconnectFeedBeforeDelete(dvId, activeEntityId, conn, 
metadataProvider, hcc);
                     }
                     // prepare job to remove feed log storage
-                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
-                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, activeEntityId.getEntityName())));
+                    
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE
+                            .getFeed(mdTxnCtx, dataverseName, 
activeEntityId.getEntityName())));
                 }
             }
 
@@ -1230,47 +1214,41 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 String datasetName = datasets.get(j).getDatasetName();
                 DatasetType dsType = datasets.get(j).getDatasetType();
                 if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
+                    List<Index> indexes =
+                            
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider,
-                                    datasets.get(j)));
+                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
+                                    metadataProvider, datasets.get(j)));
                         }
                     }
-
-                    CompiledDatasetDropStatement cds = new 
CompiledDatasetDropStatement(dataverseName, datasetName);
-                    
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, 
metadataProvider));
+                    Index primaryIndex =
+                            MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataverseName, datasetName, datasetName);
+                    
jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(datasets.get(j), primaryIndex, 
metadataProvider));
                 } else {
                     // External dataset
-                    List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                            datasetName);
+                    List<Index> indexes =
+                            
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if 
(ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds,
-                                    metadataProvider, datasets.get(j)));
-                        } else {
-                            CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                                    indexes.get(k).getIndexName());
-                            
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider,
+                            
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
                                     datasets.get(j)));
+                        } else {
+                            
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
+                                    metadataProvider, datasets.get(j)));
                         }
                     }
                     
ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
                 }
             }
-            
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, 
metadataProvider));
+            jobsToExecute.add(DataverseUtil.dropDataverseJobSpec(dv, 
metadataProvider));
             // #. mark PendingDropOp on the dataverse record by
             // first, deleting the dataverse record from the DATAVERSE_DATASET
             // second, inserting the dataverse record with the PendingDropOp 
value into the
             // DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
-                    new Dataverse(dataverseName, dv.getDataFormat(), 
IMetadataEntity.PENDING_DROP_OP));
+                    new Dataverse(dataverseName, dv.getDataFormat(), 
MetadataUtil.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1286,8 +1264,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
             // #. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
-            if (activeDefaultDataverse != null && 
activeDefaultDataverse.getDataverseName() == dataverseName) {
-                activeDefaultDataverse = null;
+            if (activeDataverse != null && activeDataverse.getDataverseName() 
== dataverseName) {
+                activeDataverse = null;
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -1296,8 +1274,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                if (activeDefaultDataverse != null && 
activeDefaultDataverse.getDataverseName() == dataverseName) {
-                    activeDefaultDataverse = null;
+                if (activeDataverse != null && 
activeDataverse.getDataverseName() == dataverseName) {
+                    activeDataverse = null;
                 }
 
                 // #. execute compensation operations
@@ -1327,6 +1305,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throw e;
         } finally {
             
MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
+            
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
@@ -1359,8 +1338,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String dataverseName = 
getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
         MutableObject<ProgressState> progress = new 
MutableObject<>(ProgressState.NO_PROGRESS);
-        MutableObject<MetadataTransactionContext> mdTxnCtx = new 
MutableObject<>(
-                MetadataManager.INSTANCE.beginTransaction());
+        MutableObject<MetadataTransactionContext> mdTxnCtx =
+                new 
MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
         MutableBoolean bActiveTxn = new MutableBoolean(true);
         metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
         MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, 
dataverseName + "." + datasetName);
@@ -1372,13 +1351,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with 
this name " + datasetName + " in dataverse "
-                            + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with 
this name " + datasetName
+                            + " in dataverse " + dataverseName + ".");
                 }
             }
-
-            doDropDataset(ds, datasetName, metadataProvider, mdTxnCtx, 
jobsToExecute, dataverseName, bActiveTxn,
-                    progress, hcc);
+            ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, 
progress, hcc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
         } catch (Exception e) {
             if (bActiveTxn.booleanValue()) {
@@ -1415,6 +1392,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, 
dataverseName + "." + datasetName);
+            
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
@@ -1434,25 +1412,23 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
 
             // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName,
-                    datasetName);
+            List<Index> indexes =
+                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
             for (int j = 0; j < indexes.size(); j++) {
                 if (indexes.get(j).isSecondaryIndex()) {
-                    CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                            indexes.get(j).getIndexName());
-                    
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
+                    
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), 
metadataProvider, ds));
                 }
             }
-            CompiledDatasetDropStatement cds = new 
CompiledDatasetDropStatement(dataverseName, datasetName);
-            jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, 
metadataProvider));
-
+            Index primaryIndex =
+                    MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), 
dataverseName, datasetName, datasetName);
+            jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, 
metadataProvider));
             // #. mark the existing dataset as PendingDropOp
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), 
dataverseName, datasetName);
             MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
                     new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
                             ds.getMetaItemTypeDataverseName(), 
ds.getMetaItemTypeName(), ds.getNodeGroupName(),
                             ds.getCompactionPolicy(), 
ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
-                            ds.getHints(), ds.getDatasetType(), 
ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
+                            ds.getHints(), ds.getDatasetType(), 
ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
             bActiveTxn.setValue(false);
@@ -1475,17 +1451,13 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // External dataset
             ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
             // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName,
-                    datasetName);
+            List<Index> indexes =
+                    
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, 
datasetName);
             for (int j = 0; j < indexes.size(); j++) {
                 if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                    CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                            indexes.get(j).getIndexName());
-                    
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
+                    
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), 
metadataProvider, ds));
                 } else {
-                    CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                            indexes.get(j).getIndexName());
-                    
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, 
metadataProvider, ds));
+                    
jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
 ds));
                 }
             }
 
@@ -1495,7 +1467,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
                             ds.getNodeGroupName(), ds.getCompactionPolicy(), 
ds.getCompactionPolicyProperties(),
                             ds.getDatasetDetails(), ds.getHints(), 
ds.getDatasetType(), ds.getDatasetId(),
-                            IMetadataEntity.PENDING_DROP_OP));
+                            MetadataUtil.PENDING_DROP_OP));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
             bActiveTxn.setValue(false);
@@ -1573,15 +1545,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     }
                 }
                 // #. prepare a job to drop the index in NC.
-                CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
+                
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(index, 
metadataProvider, ds));
 
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
-                                index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(),
+                                index.getKeyFieldNames(), 
index.getKeyFieldSourceIndicators(),
+                                index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1614,19 +1586,16 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     throw new AlgebricksException("Dropping a dataset's files 
index is not allowed.");
                 }
                 // #. prepare a job to drop the index in NC.
-                CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-                
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
-                List<Index> datasetIndexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
-                        datasetName);
+                
jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(index, 
metadataProvider, ds));
+                List<Index> datasetIndexes =
+                        MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, 
dataverseName, datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
                     // only one index + the files index, we need to delete 
both of the indexes
                     for (Index externalIndex : datasetIndexes) {
                         if 
(ExternalIndexingOperations.isFileIndex(externalIndex)) {
-                            cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
-                                    externalIndex.getIndexName());
-                            jobsToExecute.add(
-                                    
ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, 
ds));
+                            jobsToExecute
+                                    
.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, 
ds));
                             // #. mark PendingDropOp on the existing files 
index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, 
dataverseName, datasetName,
                                     externalIndex.getIndexName());
@@ -1635,7 +1604,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                                             externalIndex.getIndexType(), 
externalIndex.getKeyFieldNames(),
                                             
externalIndex.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
                                             index.isEnforcingKeyFileds(), 
externalIndex.isPrimaryIndex(),
-                                            IMetadataEntity.PENDING_DROP_OP));
+                                            MetadataUtil.PENDING_DROP_OP));
                         }
                     }
                 }
@@ -1643,9 +1612,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
-                                index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, 
index.getIndexType(),
+                                index.getKeyFieldNames(), 
index.getKeyFieldSourceIndicators(),
+                                index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
+                                MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1666,7 +1636,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 if (dropFilesIndex) {
                     // delete the files index too
                     MetadataManager.INSTANCE.dropIndex(mdTxnCtx, 
dataverseName, datasetName,
-                            
ExternalIndexingOperations.getFilesIndexName(datasetName));
+                            IndexingConstants.getFilesIndexName(datasetName));
                     
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
                 }
@@ -1698,14 +1668,14 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                             datasetName, indexName);
                     if (dropFilesIndex) {
                         
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                                datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
+                                datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
                     }
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent 
state: pending index(" + dataverseName + "."
-                            + datasetName + "." + indexName + ") couldn't be 
removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent 
state: pending index(" + dataverseName
+                            + "." + datasetName + "." + indexName + ") 
couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1713,6 +1683,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         } finally {
             MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, 
dataverseName + "." + datasetName);
+            
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
@@ -1769,8 +1740,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleCreateFunctionStatement(MetadataProvider 
metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleCreateFunctionStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
         CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
         String dataverse = 
getActiveDataverseName(cfs.getSignature().getNamespace());
         cfs.getSignature().setNamespace(dataverse);
@@ -1826,8 +1796,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleLoadStatement(MetadataProvider metadataProvider, 
Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+    protected void handleLoadStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc)
+            throws Exception {
         LoadStatement loadStmt = (LoadStatement) stmt;
         String dataverseName = getActiveDataverse(loadStmt.getDataverseName());
         String datasetName = loadStmt.getDatasetName().getValue();
@@ -1836,11 +1806,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, 
dataverseName + "." + datasetName);
         try {
-            CompiledLoadFromFileStatement cls = new 
CompiledLoadFromFileStatement(dataverseName,
-                    loadStmt.getDatasetName().getValue(), 
loadStmt.getAdapter(), loadStmt.getProperties(),
-                    loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionConfig,
-                    cls);
+            CompiledLoadFromFileStatement cls =
+                    new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
+                            loadStmt.getAdapter(), loadStmt.getProperties(), 
loadStmt.dataIsAlreadySorted());
+            JobSpecification spec =
+                    apiFramework.compileQuery(hcc, metadataProvider, null, 0, 
null, sessionConfig, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1894,8 +1864,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    public void handleDeleteStatement(MetadataProvider metadataProvider, 
Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+    public void handleDeleteStatement(MetadataProvider metadataProvider, 
Statement stmt, IHyracksClientConnection hcc)
+            throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtDelete.getDataverseName());
@@ -1934,13 +1904,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     @Override
     public JobSpecification rewriteCompileQuery(IClusterInfoCollector 
clusterInfoCollector,
-            MetadataProvider metadataProvider, Query query,
-            ICompiledDmlStatement stmt)
+            MetadataProvider metadataProvider, Query query, 
ICompiledDmlStatement stmt)
             throws RemoteException, AlgebricksException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata 
transaction)
-        Pair<IReturningStatement, Integer> rewrittenResult = 
apiFramework.reWriteQuery(declaredFunctions,
-                metadataProvider, query, sessionConfig);
+        Pair<IReturningStatement, Integer> rewrittenResult =
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionConfig);
 
         // Query Compilation (happens under the same ongoing metadata 
transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, (Query) rewrittenResult.first,
@@ -1952,8 +1921,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throws RemoteException, AlgebricksException, ACIDException {
 
         // Insert/upsert statement rewriting (happens under the same ongoing 
metadata transaction)
-        Pair<IReturningStatement, Integer> rewrittenResult = 
apiFramework.reWriteQuery(declaredFunctions,
-                metadataProvider, insertUpsert, sessionConfig);
+        Pair<IReturningStatement, Integer> rewrittenResult =
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionConfig);
 
         InsertStatement rewrittenInsertUpsert = (InsertStatement) 
rewrittenResult.first;
         String dataverseName = 
getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -2049,8 +2018,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             boolean extendingExisting = cfps.getSourcePolicyName() != null;
             String description = cfps.getDescription() == null ? "" : 
cfps.getDescription();
             if (extendingExisting) {
-                FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
-                        
.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, 
cfps.getSourcePolicyName());
+                FeedPolicyEntity sourceFeedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(
+                        metadataProvider.getMetadataTxnContext(), dataverse, 
cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
                             MetadataConstants.METADATA_DATAVERSE_NAME, 
cfps.getSourcePolicyName());
@@ -2105,8 +2074,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
 
             EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-            FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                    .getActiveEntityListener(feedId);
+            FeedEventsListener listener =
+                    (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(feedId);
             if (listener != null) {
                 StringBuilder builder = new StringBuilder();
                 for (FeedConnectionId connectionId : 
listener.getConnections()) {
@@ -2134,8 +2103,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleDropFeedPolicyStatement(MetadataProvider 
metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleDropFeedPolicyStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) 
stmt;
@@ -2176,8 +2144,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         IActiveLifecycleEventSubscriber eventSubscriber = new 
ActiveLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
cfs.getFeedName());
-        FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        FeedEventsListener listener =
+                (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
         MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, 
dataverseName + "." + datasetName,
                 dataverseName + "." + feedName);
         try {
@@ -2196,26 +2164,26 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new CompilationException("Feed " + cfs.getFeedName() + " 
is already connected to dataset "
                         + cfs.getDatasetName().getValue());
             }
-            FeedPolicyEntity feedPolicy = 
FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
-                    mdTxnCtx);
+            FeedPolicyEntity feedPolicy =
+                    FeedMetadataUtil.validateIfPolicyExists(dataverseName, 
cbfs.getPolicyName(), mdTxnCtx);
             // All Metadata checks have passed. Feed connect request is valid. 
//
             if (listener == null) {
                 listener = new FeedEventsListener(entityId);
                 
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
             }
             FeedPolicyAccessor policyAccessor = new 
FeedPolicyAccessor(feedPolicy.getProperties());
-            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = 
getFeedConnectionRequest(dataverseName,
-                    feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
+            Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple =
+                    getFeedConnectionRequest(dataverseName, feed, 
cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
             FeedConnectionRequest connectionRequest = triple.first;
             boolean createFeedIntakeJob = triple.second;
             listener.registerFeedEventSubscriber(eventSubscriber);
             subscriberRegistered = true;
             if (createFeedIntakeJob) {
                 EntityId feedId = 
connectionRequest.getFeedJointKey().getFeedId();
-                Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(),
-                        feedId.getEntityName());
-                Pair<JobSpecification, IAdapterFactory> pair = 
FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
-                        metadataProvider, policyAccessor);
+                Feed primaryFeed =
+                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName());
+                Pair<JobSpecification, IAdapterFactory> pair =
+                        FeedOperations.buildFeedIntakeJobSpec(primaryFeed, 
metadataProvider, policyAccessor);
                 // adapter configuration are valid at this stage
                 // register the feed joints (these are auto-de-registered)
                 int numOfPrividers = 
pair.second.getPartitionConstraint().getLocations().length;
@@ -2276,8 +2244,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         FeedRuntimeType connectionLocation;
         FeedJointKey feedJointKey = getFeedJointKey(feed, mdTxnCtx);
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverse, 
feed.getFeedName());
-        FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        FeedEventsListener listener =
+                (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
         if (listener == null) {
             throw new CompilationException("Feed Listener is not registered");
         }
@@ -2339,8 +2307,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (sourceFeed.getAppliedFunction() != null) {
                 appliedFunctions.add(0, 
sourceFeed.getAppliedFunction().getName());
             }
-            Feed parentFeed = MetadataManager.INSTANCE.getFeed(ctx, 
feed.getDataverseName(),
-                    sourceFeed.getSourceFeedName());
+            Feed parentFeed =
+                    MetadataManager.INSTANCE.getFeed(ctx, 
feed.getDataverseName(), sourceFeed.getSourceFeedName());
             sourceFeed = parentFeed;
         }
 
@@ -2365,12 +2333,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, 
feed.getDataverseName(), feed.getFeedName());
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), 
cfs.getDatasetName().getValue());
         IActiveLifecycleEventSubscriber eventSubscriber = new 
ActiveLifecycleEventSubscriber();
-        FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
-                .getActiveEntityListener(entityId);
+        FeedEventsListener listener =
+                (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
         if (listener == null || !listener.isEntityUsingDataset(dataverseName, 
datasetName)) {
-            throw new CompilationException(
-                    "Feed " + feed.getFeedId().getEntityName() + " is 
currently not connected to "
-                    + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
+            throw new CompilationException("Feed " + 
feed.getFeedId().getEntityName()
+                    + " is currently not connected to " + 
cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
         listener.registerFeedEventSubscriber(eventSubscriber);
         MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, 
dataverseName + "." + datasetName,
@@ -2382,8 +2349,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new CompilationException(
                         "Unknown dataset :" + cfs.getDatasetName().getValue() 
+ " in dataverse " + dataverseName);
             }
-            Pair<JobSpecification, Boolean> specDisconnectType = FeedOperations
-                    .buildDisconnectFeedJobSpec(metadataProvider, 
connectionId);
+            Pair<JobSpecification, Boolean> specDisconnectType =
+                    FeedOperations.buildDisconnectFeedJobSpec(connectionId);
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2415,8 +2382,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         SubscribeFeedStatement bfs = (SubscribeFeedStatement) stmt;
         bfs.initialize(metadataProvider.getMetadataTxnContext());
 
-        CompiledSubscribeFeedStatement csfs = new 
CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(),
-                bfs.getVarCounter());
+        CompiledSubscribeFeedStatement csfs =
+                new 
CompiledSubscribeFeedStatement(bfs.getSubscriptionRequest(), 
bfs.getVarCounter());
         
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + 
Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, 
"" + bfs.getPolicy());
         metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS,
@@ -2445,8 +2412,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (jobSpec != null) {
                 FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
                         
.getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
-                FeedConnectJobInfo activeJob = new 
FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
-                        null, ActivityState.ACTIVE,
+                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(
+                        bfs.getSubscriptionRequest().getReceivingFeedId(), 
null, ActivityState.ACTIVE,
                         new 
FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
                         listener.getSourceFeedJoint(), null, alteredJobSpec, 
policy.getProperties());
                 
alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
 activeJob);
@@ -2486,10 +2453,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     ds.getItemTypeDataverseName(), itemTypeName);
             ARecordType metaRecordType = null;
             if (ds.hasMetaPart()) {
-                metaRecordType = (ARecordType) MetadataManager.INSTANCE
-                        .getDatatype(metadataProvider.getMetadataTxnContext(), 
ds.getMetaItemTypeDataverseName(),
-                                ds.getMetaItemTypeName())
-                        .getDatatype();
+                metaRecordType =
+                        (ARecordType) 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                                ds.getMetaItemTypeDataverseName(), 
ds.getMetaItemTypeName()).getDatatype();
             }
             // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
@@ -2497,27 +2463,24 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new AlgebricksException(
                         "Cannot compact the extrenal dataset " + datasetName + 
" because it has no indexes");
             }
-            Dataverse dataverse = 
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-                    dataverseName);
-            
jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, 
datasetName, metadataProvider));
+            Dataverse dataverse =
+                    
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dataverseName);
+            jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, 
datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType enforcedType = createEnforcedType(aRecordType, 
indexes);
+            Pair<ARecordType, ARecordType> enforcedTypes =
+                    TypeUtil.createEnforcedType(aRecordType, metaRecordType, 
indexes);
+            ARecordType enforcedType = enforcedTypes.first;
+            ARecordType enforcedMeta = enforcedTypes.second;
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
-                        CompiledIndexCompactStatement cics = new 
CompiledIndexCompactStatement(dataverseName,
-                                datasetName, indexes.get(j).getIndexName(), 
indexes.get(j).getKeyFieldNames(),
-                                indexes.get(j).getKeyFieldTypes(), 
indexes.get(j).isEnforcingKeyFileds(),
-                                indexes.get(j).getGramLength(), 
indexes.get(j).getIndexType());
-                        List<Integer> keySourceIndicators = 
indexes.get(j).getKeyFieldSourceIndicators();
-
-                        
jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, 
aRecordType,
-                                metaRecordType, keySourceIndicators, 
enforcedType, metadataProvider));
+                        
jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, 
indexes.get(j), aRecordType,
+                                metaRecordType, enforcedType, enforcedMeta, 
metadataProvider));
                     }
                 }
             } else {
-                prepareCompactJobsForExternalDataset(indexes, dataverseName, 
datasetName, ds, jobsToExecute,
-                        aRecordType, metaRecordType, metadataProvider, 
enforcedType);
+                prepareCompactJobsForExternalDataset(indexes, ds, 
jobsToExecute, aRecordType, metaRecordType,
+                        metadataProvider, enforcedType, enforcedMeta);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2533,28 +2496,23 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.compactEnd(dataverseName, 
dataverseName + "." + datasetName);
+            
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
-    protected void prepareCompactJobsForExternalDataset(List<Index> indexes, 
String dataverseName, String datasetName,
-            Dataset ds, List<JobSpecification> jobsToExecute, ARecordType 
aRecordType, ARecordType metaRecordType,
-            MetadataProvider metadataProvider, ARecordType enforcedType) 
throws AlgebricksException {
+    protected void prepareCompactJobsForExternalDataset(List<Index> indexes, 
Dataset ds,
+            List<JobSpecification> jobsToExecute, ARecordType aRecordType, 
ARecordType metaRecordType,
+            MetadataProvider metadataProvider, ARecordType enforcedType, 
ARecordType enforcedMeta)
+            throws AlgebricksException {
         for (int j = 0; j < indexes.size(); j++) {
             if (!ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                CompiledIndexCompactStatement cics = new 
CompiledIndexCompactStatement(dataverseName, datasetName,
-                        indexes.get(j).getIndexName(), 
indexes.get(j).getKeyFieldNames(),
-                        indexes.get(j).getKeyFieldTypes(), 
indexes.get(j).isEnforcingKeyFileds(),
-                        indexes.get(j).getGramLength(), 
indexes.get(j).getIndexType());
-                List<Integer> keySourceIndicators = null;
-                if (ds.hasMetaPart()) {
-                    keySourceIndicators = 
indexes.get(j).getKeyFieldSourceIndicators();
-                }
-                
jobsToExecute.add(IndexOperations.buildSecondaryIndexCompactJobSpec(cics, 
aRecordType, metaRecordType,
-                        keySourceIndicators, enforcedType, metadataProvider));
+                
jobsToExecute.add(IndexUtil.buildSecondaryIndexCompactJobSpec(ds, 
indexes.get(j), aRecordType,
+                        metaRecordType, enforcedType, enforcedMeta, 
metadataProvider));
             }
 
         }
-        
jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, 
metadataProvider));
+        
jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, 
metadataProvider,
+                new StorageComponentProvider()));
     }
 
     protected JobSpecification handleQuery(MetadataProvider metadataProvider, 
Query query,
@@ -2563,7 +2521,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, 
query.getDataverses(), query.getDatasets());
+        MetadataLockManager.INSTANCE.queryBegin(activeDataverse, 
query.getDataverses(), query.getDatasets());
         try {
             JobSpecification jobSpec = rewriteCompileQuery(hcc, 
metadataProvider, query, null);
 
@@ -2598,8 +2556,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         ResultHandle hand;
         switch (resultDelivery) {
             case ASYNC:
-                hand = new 
ResultHandle(jobId,metadataProvider.getResultSetId());
-                ResultUtil.printResultHandle(hand,sessionConfig);
+                hand = new ResultHandle(jobId, 
metadataProvider.getResultSetId());
+                ResultUtil.printResultHandle(hand, sessionConfig);
                 hcc.waitForCompletion(jobId);
                 sessionConfig.out().flush();
                 break;
@@ -2611,8 +2569,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 break;
             case DEFERRED:
                 hcc.waitForCompletion(jobId);
-                hand = new 
ResultHandle(jobId,metadataProvider.getResultSetId());
-                ResultUtil.printResultHandle(hand,sessionConfig);
+                hand = new ResultHandle(jobId, 
metadataProvider.getResultSetId());
+                ResultUtil.printResultHandle(hand, sessionConfig);
                 sessionConfig.out().flush();
                 break;
             default:
@@ -2620,8 +2578,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleCreateNodeGroupStatement(MetadataProvider 
metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleCreateNodeGroupStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
 
         NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
         String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
@@ -2658,7 +2615,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         RefreshExternalDatasetStatement stmtRefresh = 
(RefreshExternalDatasetStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtRefresh.getDataverseName());
         String datasetName = stmtRefresh.getDatasetName().getValue();
-        ExternalDatasetTransactionState transactionState = 
ExternalDatasetTransactionState.COMMIT;
+        TransactionState transactionState = TransactionState.COMMIT;
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, 
dataverseName + "." + datasetName);
         boolean bActiveTxn = true;
@@ -2738,20 +2695,20 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
 
             // Create the files index update job
-            spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, 
metadataFiles, deletedFiles, addedFiles,
-                    appendedFiles, metadataProvider);
+            spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, 
metadataFiles, addedFiles, appendedFiles,
+                    metadataProvider);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            transactionState = ExternalDatasetTransactionState.BEGIN;
+            transactionState = TransactionState.BEGIN;
 
             // run the files update job
             JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
-                    spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, 
index, metadataFiles, deletedFiles,
-                            addedFiles, appendedFiles, metadataProvider);
+                    spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, 
index, metadataFiles, addedFiles,
+                            appendedFiles, metadataProvider);
                     // run the files update job
                     JobUtils.runJob(hcc, spec, true);
                 }
@@ -2765,12 +2722,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
             ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
-                    .setState(ExternalDatasetTransactionState.READY_TO_COMMIT);
+                    .setState(TransactionState.READY_TO_COMMIT);
             ((ExternalDatasetDetails) 
transactionDataset.getDatasetDetails()).setRefreshTimestamp(txnTime);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, 
transactionDataset);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
+            transactionState = TransactionState.READY_TO_COMMIT;
             // We don't release the latch since this job is expected to be 
quick
             JobUtils.runJob(hcc, spec, true);
             // Start a new metadata transaction to record the final state of 
the transaction
@@ -2779,9 +2736,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             bActiveTxn = true;
 
             for (ExternalFile file : metadataFiles) {
-                if (file.getPendingOp() == 
ExternalFilePendingOp.PENDING_DROP_OP) {
+                if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
                     MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                } else if (file.getPendingOp() == 
ExternalFilePendingOp.PENDING_NO_OP) {
+                } else if (file.getPendingOp() == ExternalFilePendingOp.NO_OP) 
{
                     Iterator<ExternalFile> iterator = appendedFiles.iterator();
                     while (iterator.hasNext()) {
                         ExternalFile appendedFile = iterator.next();
@@ -2792,7 +2749,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                             
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
                             // add the original file with appended information
                             appendedFile.setFileNumber(file.getFileNumber());
-                            
appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                            
appendedFile.setPendingOp(ExternalFilePendingOp.NO_OP);
                             MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, 
appendedFile);
                             iterator.remove();
                         }
@@ -2808,13 +2765,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // insert new files
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                file.setPendingOp(ExternalFilePendingOp.NO_OP);
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
             // mark the transaction as complete
-            ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
-                    .setState(ExternalDatasetTransactionState.COMMIT);
+            ((ExternalDatasetDetails) 
transactionDataset.getDatasetDetails()).setState(TransactionState.COMMIT);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, 
transactionDataset);
 
             // commit metadata transaction
@@ -2824,15 +2780,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            if (transactionState == 
ExternalDatasetTransactionState.READY_TO_COMMIT) {
+            if (transactionState == TransactionState.READY_TO_COMMIT) {
                 throw new IllegalStateException("System is inconsistent state: 
commit of (" + dataverseName + "."
                         + datasetName + ") refresh couldn't carry out the 
commit phase", e);
             }
-            if (transactionState == ExternalDatasetTransactionState.COMMIT) {
+            if (transactionState == TransactionState.COMMIT) {
                 // Nothing to do , everything should be clean
                 th

<TRUNCATED>

Reply via email to