http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index bd5c024..7725936 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,53 +34,50 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
-import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.lang.common.base.IReturningStatement;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -123,7 +120,6 @@ import 
org.apache.asterix.lang.common.statement.TypeDropStatement;
 import org.apache.asterix.lang.common.statement.WriteStatement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.metadata.IDatasetDetails;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
@@ -145,12 +141,12 @@ import 
org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
-import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -159,16 +155,16 @@ import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.asterix.translator.SessionConfig;
-import org.apache.asterix.translator.SessionOutput;
-import org.apache.asterix.translator.TypeTranslator;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
 import org.apache.asterix.utils.FeedOperations;
@@ -178,7 +174,6 @@ import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Triple;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
@@ -217,18 +212,17 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     protected final List<FunctionDecl> declaredFunctions;
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
-    protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
     protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
+    protected final IMetadataLockManager lockManager;
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionOutput output,
-            ILangCompilationProvider compliationProvider, 
IStorageComponentProvider componentProvider,
-            ExecutorService executorService) {
+            ILangCompilationProvider compliationProvider, ExecutorService 
executorService) {
         this.appCtx = appCtx;
+        this.lockManager = appCtx.getMetadataLockManager();
         this.statements = statements;
         this.sessionOutput = output;
         this.sessionConfig = output.config();
-        this.componentProvider = componentProvider;
         declaredFunctions = getDeclaredFunctions(statements);
         apiFramework = new APIFramework(compliationProvider);
         rewriterFactory = compliationProvider.getRewriterFactory();
@@ -281,7 +275,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                 }
                 validateOperation(appCtx, activeDataverse, stmt);
                 rewriteStatement(stmt); // Rewrite the statement's AST.
-                MetadataProvider metadataProvider = new 
MetadataProvider(appCtx, activeDataverse, componentProvider);
+                MetadataProvider metadataProvider = new 
MetadataProvider(appCtx, activeDataverse);
                 metadataProvider.setWriterFactory(writerFactory);
                 
metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
                 metadataProvider.setOutputFile(outputFile);
@@ -431,7 +425,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         String dvName = dvd.getDataverseName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(),
 dvName);
+        lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), 
dvName);
         try {
             Dataverse dv = 
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dvName);
             if (dv == null) {
@@ -454,7 +448,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-        
MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(),
 dvName);
+        lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), 
dvName);
         try {
             Dataverse dv = 
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dvName);
             if (dv != null) {
@@ -529,7 +523,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.createDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 itemTypeDataverseName, itemTypeDataverseName + "." + 
itemTypeName, metaItemTypeDataverseName,
                 metaItemTypeDataverseName + "." + metaItemTypeName, 
nodegroupName, compactionPolicy,
                 dataverseName + "." + datasetName, defaultCompactionPolicy);
@@ -693,11 +687,11 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     protected static void 
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
             throws CompilationException {
         StringBuilder builder = null;
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener.isEntityUsingDataset(dataset)) {
+            if (listener.isEntityUsingDataset(dataset) && listener.isActive()) 
{
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -741,15 +735,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
+        String indexName = stmtCreateIndex.getIndexName().getValue();
         List<Integer> keySourceIndicators = 
stmtCreateIndex.getFieldSourceIndicators();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
-        String indexName = null;
+        String datasetFullyQualifiedName = dataverseName + "." + datasetName;
         Dataset ds = null;
-        // For external datasets
         Index index = null;
+        MetadataLockUtil.createIndexBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
+                datasetFullyQualifiedName);
         try {
             ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
@@ -757,7 +751,6 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         "There is no dataset with this name " + datasetName + 
" in dataverse " + dataverseName);
             }
 
-            indexName = stmtCreateIndex.getIndexName().getValue();
             index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                     datasetName, indexName);
             if (index != null) {
@@ -1111,7 +1104,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String typeName = stmtCreateType.getIdent().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.createTypeBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + typeName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverseName);
@@ -1157,7 +1150,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        
MetadataLockManager.INSTANCE.acquireDataverseWriteLock(metadataProvider.getLocks(),
 dataverseName);
+        lockManager.acquireDataverseWriteLock(metadataProvider.getLocks(), 
dataverseName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverseName);
             if (dv == null) {
@@ -1168,26 +1161,31 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     throw new AlgebricksException("There is no dataverse with 
this name " + dataverseName + ".");
                 }
             }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
             // # disconnect all feeds from any datasets in the dataverse.
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
             IActiveEntityEventsListener[] activeListeners = 
activeEventHandler.getEventListeners();
-            Identifier dvId = new Identifier(dataverseName);
-            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, 
metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
-            tempMdProvider.setConfig(metadataProvider.getConfig());
             for (IActiveEntityEventsListener listener : activeListeners) {
                 EntityId activeEntityId = listener.getEntityId();
                 if 
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
                         && 
activeEntityId.getDataverse().equals(dataverseName)) {
-                    tempMdProvider.getLocks().reset();
-                    stopFeedBeforeDelete(new Pair<>(dvId, new 
Identifier(activeEntityId.getEntityName())),
-                            tempMdProvider);
-                    // prepare job to remove feed log storage
-                    
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
-                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, activeEntityId.getEntityName())));
+                    if (listener.getState() != ActivityState.STOPPED) {
+                        ((ActiveEntityEventsListener) 
listener).stop(metadataProvider);
+                    }
+                    FeedEventsListener feedListener = (FeedEventsListener) 
listener;
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    bActiveTxn = true;
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    doDropFeed(hcc, metadataProvider, feedListener.getFeed());
+                    
MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+                    bActiveTxn = false;
                 }
             }
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. prepare jobs which will drop corresponding datasets with 
indexes.
             List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
@@ -1243,7 +1241,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // Drops all node groups that no longer needed
             for (Dataset dataset : datasets) {
                 String nodeGroup = dataset.getNodeGroupName();
-                
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 nodeGroup);
+                
lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
                 if (MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup) 
!= null) {
                     MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, 
nodeGroup, true);
                 }
@@ -1294,26 +1292,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void stopFeedBeforeDelete(Pair<Identifier, Identifier> 
feedNameComp, MetadataProvider metadataProvider) {
-        StopFeedStatement disStmt = new StopFeedStatement(feedNameComp);
-        try {
-            handleStopFeedStatement(metadataProvider, disStmt);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Stopped feed " + feedNameComp.second.getValue());
-            }
-        } catch (Exception exception) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to stop feed " + 
feedNameComp.second.getValue() + exception);
-            }
-        }
-    }
-
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
-        
MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.dropDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
             doDropDataset(dataverseName, datasetName, metadataProvider, 
stmtDelete.getIfExists(), hcc, true);
@@ -1386,14 +1370,14 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
         String datasetName = stmtIndexDrop.getDatasetName().getValue();
         String dataverseName = 
getActiveDataverse(stmtIndexDrop.getDataverseName());
+        String indexName = stmtIndexDrop.getIndexName().getValue();
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        
MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
-        String indexName = null;
+        String dsFullyQualifiedName = dataverseName + "." + datasetName;
+        MetadataLockUtil.dropIndexBegin(lockManager, 
metadataProvider.getLocks(), dataverseName, dsFullyQualifiedName);
         // For external index
         boolean dropFilesIndex = false;
         try {
@@ -1402,8 +1386,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + 
" in dataverse " + dataverseName);
             }
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
             IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
@@ -1420,7 +1404,6 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataverseName, datasetName, indexName);
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
@@ -1581,7 +1564,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.dropTypeBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + typeName);
         try {
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, 
dataverseName, typeName);
@@ -1606,7 +1589,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 nodegroupName);
+        lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), 
nodegroupName);
         try {
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
nodegroupName);
             if (ng == null) {
@@ -1634,7 +1617,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(),
 dataverse,
+        MetadataLockUtil.functionStatementBegin(lockManager, 
metadataProvider.getLocks(), dataverse,
                 dataverse + "." + functionName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverse);
@@ -1662,7 +1645,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         
signature.setNamespace(getActiveDataverseName(signature.getNamespace()));
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(),
 signature.getNamespace(),
+        MetadataLockUtil.functionStatementBegin(lockManager, 
metadataProvider.getLocks(), signature.getNamespace(),
                 signature.getNamespace() + "." + signature.getName());
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, 
signature);
@@ -1692,7 +1675,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
             CompiledLoadFromFileStatement cls =
@@ -1723,7 +1706,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() throws AsterixException {
-                
MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+                MetadataLockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(),
                         dataverseName + "." + 
stmtInsertUpsert.getDatasetName());
             }
 
@@ -1783,7 +1766,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+        MetadataLockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(),
                 dataverseName + "." + stmtDelete.getDatasetName());
         try {
             metadataProvider.setWriteTransaction(true);
@@ -1860,7 +1843,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.createFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + feedName);
         Feed feed = null;
         try {
@@ -1895,7 +1878,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
         dataverse = getActiveDataverse(null);
         policy = cfps.getPolicyName();
-        
MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(), 
dataverse,
+        MetadataLockUtil.createFeedPolicyBegin(lockManager, 
metadataProvider.getLocks(), dataverse,
                 dataverse + "." + policy);
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1956,7 +1939,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String feedName = stmtFeedDrop.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.dropFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + feedName);
         try {
             Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, feedName);
@@ -1967,27 +1950,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return;
             }
-
-            EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-            ActiveEntityEventsListener listener =
-                    (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(feedId);
-            if (listener != null) {
-                throw new AlgebricksException("Feed " + feedId
-                        + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
-            } else {
-                JobSpecification spec = 
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
-                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName()));
-                runJob(hcc, spec);
-                MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, 
feedName);
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Removed feed " + feedId);
-            }
+            doDropFeed(hcc, metadataProvider, feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -1996,13 +1960,36 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
     }
 
+    protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider 
metadataProvider, Feed feed)
+            throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
+        EntityId feedId = feed.getFeedId();
+        ActiveNotificationHandler activeNotificationHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        ActiveEntityEventsListener listener =
+                (ActiveEntityEventsListener) 
activeNotificationHandler.getListener(feedId);
+        if (listener != null && listener.getState() != ActivityState.STOPPED) {
+            throw new AlgebricksException("Feed " + feedId
+                    + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
+        } else if (listener != null) {
+            listener.unregister();
+        }
+        JobSpecification spec = 
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
+                MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName()));
+        runJob(hcc, spec);
+        MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), 
feed.getFeedName());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removed feed " + feedId);
+        }
+    }
+
     protected void handleDropFeedPolicyStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) 
stmt;
         String dataverseName = 
getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
         String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
-        
MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.dropFeedPolicyBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + policyName);
         try {
             FeedPolicyEntity feedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
@@ -2028,56 +2015,45 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         String dataverseName = getActiveDataverse(sfs.getDataverseName());
         String feedName = sfs.getFeedName().getValue();
-        // Transcation handler
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        // Runtime handler
-        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-        // Feed & Feed Connections
-        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
-                metadataProvider.getMetadataTxnContext());
-        List<FeedConnection> feedConnections = MetadataManager.INSTANCE
-                .getFeedConections(metadataProvider.getMetadataTxnContext(), 
dataverseName, feedName);
-        ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
-        IStorageComponentProvider storageComponentProvider = new 
StorageComponentProvider();
-        DefaultStatementExecutorFactory qtFactory = new 
DefaultStatementExecutorFactory();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler
-                .getActiveEntityListener(entityId);
-        if (listener != null) {
-            throw new AlgebricksException("Feed " + feedName + " is started 
already.");
-        }
-        // Start
-        
MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + feedName, feedConnections);
+        boolean committed = false;
+        MetadataLockUtil.startFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + feedName);
         try {
-            // Prepare policy
-            List<IDataset> datasets = new ArrayList<>();
-            for (FeedConnection connection : feedConnections) {
-                Dataset ds = 
metadataProvider.findDataset(connection.getDataverseName(), 
connection.getDatasetName());
-                datasets.add(ds);
-            }
-            org.apache.commons.lang3.tuple.Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(sessionOutput, 
metadataProvider, feed, feedConnections,
-                            compilationProvider, storageComponentProvider, 
qtFactory, hcc);
-
-            JobSpecification feedJob = jobInfo.getLeft();
-            listener = new ActiveEntityEventsListener(appCtx, entityId, 
datasets, jobInfo.getRight(),
-                    FeedIntakeOperatorNodePushable.class.getSimpleName());
-            activeEventHandler.registerListener(listener);
-            IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(listener, ActivityState.STARTED);
-            
feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
-            // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
-            // We will need to design general exception handling mechanism for 
feeds.
-            JobUtils.runJob(hcc, feedJob,
-                    
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
-            eventSubscriber.sync();
-            LOGGER.log(Level.INFO, "Submitted");
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            // Runtime handler
+            EntityId entityId = new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName);
+            // Feed & Feed Connections
+            Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
+                    metadataProvider.getMetadataTxnContext());
+            List<FeedConnection> feedConnections = MetadataManager.INSTANCE
+                    
.getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, 
feedName);
+            for (FeedConnection feedConnection : feedConnections) {
+                // what if the dataset is in a different dataverse
+                String fqName = feedConnection.getDataverseName() + "." + 
feedConnection.getDatasetName();
+                
lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), fqName);
+            }
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
+            if (listener == null) {
+                // Prepare policy
+                List<Dataset> datasets = new ArrayList<>();
+                for (FeedConnection connection : feedConnections) {
+                    Dataset ds =
+                            
metadataProvider.findDataset(connection.getDataverseName(), 
connection.getDatasetName());
+                    datasets.add(ds);
+                }
+                listener = new FeedEventsListener(this, 
metadataProvider.getApplicationContext(), hcc, entityId,
+                        datasets, null, 
FeedIntakeOperatorNodePushable.class.getSimpleName(),
+                        NoRetryPolicyFactory.INSTANCE, feed, feedConnections);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            committed = true;
+            listener.start(metadataProvider);
         } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            if (listener != null) {
-                activeEventHandler.unregisterListener(listener);
+            if (!committed) {
+                abort(e, e, mdTxnCtx);
             }
             throw e;
         } finally {
@@ -2089,32 +2065,18 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         StopFeedStatement sfst = (StopFeedStatement) stmt;
         String dataverseName = getActiveDataverse(sfst.getDataverseName());
         String feedName = sfst.getFeedName().getValue();
-        EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         // Obtain runtime info from ActiveListener
-        ActiveEntityEventsListener listener =
-                (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(feedId);
+        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
         if (listener == null) {
             throw new AlgebricksException("Feed " + feedName + " is not 
started.");
         }
-        IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(listener, ActivityState.STOPPED);
-        // Transaction
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), 
dataverseName, feedName);
+        MetadataLockUtil.stopFeedBegin(lockManager, 
metadataProvider.getLocks(), entityId.getDataverse(),
+                entityId.getEntityName());
         try {
-            // validate
-            FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, 
mdTxnCtx);
-            // Construct ActiveMessage
-            for (int i = 0; i < listener.getLocations().getLocations().length; 
i++) {
-                String intakeLocation = 
listener.getLocations().getLocations()[i];
-                FeedOperations.SendStopMessageToNode(appCtx, feedId, 
intakeLocation, i);
-            }
-            eventSubscriber.sync();
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
+            listener.stop(metadataProvider);
         } finally {
             metadataProvider.getLocks().unlock();
         }
@@ -2130,20 +2092,20 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // Check whether feed is alive
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
-            throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
-        }
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         // Transaction handling
-        
MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.connectFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName, dataverseName + "." + 
feedName);
         try {
             // validation
-            FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, datasetName, mdTxnCtx);
+            Dataset dataset = 
FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, 
datasetName);
             Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
                     metadataProvider.getMetadataTxnContext());
+            FeedEventsListener listener = (FeedEventsListener) 
activeEventHandler.getListener(feed.getFeedId());
+            if (listener != null && listener.isActive()) {
+                throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
+            }
             ARecordType outputType = FeedMetadataUtil.getOutputType(feed, 
feed.getAdapterConfiguration(),
                     ExternalDataConstants.KEY_TYPE_NAME);
             List<FunctionSignature> appliedFunctions = 
cfs.getAppliedFunctions();
@@ -2169,6 +2131,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 MetadataManager.INSTANCE.updateFunction(mdTxnCtx, func);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            if (listener != null) {
+                listener.add(dataset);
+                listener.addFeedConnection(fc);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2184,21 +2150,25 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        // Check whether feed is alive
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
-            throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
-        }
-        
MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.disconnectFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName, dataverseName + "." + 
cfs.getFeedName());
         try {
-            FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, cfs.getDatasetName().getValue(),
-                    mdTxnCtx);
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            // Check whether feed is alive
+            ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler
+                    .getListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName));
+            if (listener != null && listener.isActive()) {
+                throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
+            }
+            FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, cfs.getDatasetName().getValue());
             FeedMetadataUtil.validateIfFeedExists(dataverseName, 
cfs.getFeedName().getValue(), mdTxnCtx);
             FeedConnection fc = 
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
                     dataverseName, feedName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, 
datasetName);
+            if (ds == null) {
+                throw new CompilationException("Dataset " + dataverseName + 
"." + datasetName + " doesn't exist");
+            }
             if (fc == null) {
                 throw new CompilationException("Feed " + feedName + " is 
currently not connected to "
                         + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
@@ -2210,6 +2180,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            if (listener != null) {
+                listener.remove(ds);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2227,7 +2200,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.compactBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
             Dataset ds = metadataProvider.findDataset(dataverseName, 
datasetName);
@@ -2447,7 +2420,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 ngName);
+        lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), 
ngName);
         try {
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
ngName);
             if (ng != null) {
@@ -2490,7 +2463,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         Dataset transactionDataset = null;
         boolean lockAquired = false;
         boolean success = false;
-        
MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
+        MetadataLockUtil.refreshDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
             ds = metadataProvider.findDataset(dataverseName, datasetName);
@@ -2721,7 +2694,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 DatasetUtil.isFullyQualifiedName(datasetNameTo) ? 
datasetNameTo : dataverseNameTo + '.' + datasetNameTo;
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
 fullyQualifiedDatasetNameTo);
+        MetadataLockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(), fullyQualifiedDatasetNameTo);
         try {
             prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, 
dataverseNameFrom, dataverseNameTo,
                     datasetNameFrom, datasetNameTo, mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index cd9138a..dc92f92 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -30,15 +30,14 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.api.http.server.DdlApiServlet;
 import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
-import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.FullApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.QueryApiServlet;
@@ -52,6 +51,7 @@ import org.apache.asterix.api.http.server.ShutdownApiServlet;
 import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
 import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.ResourceIdManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
@@ -65,6 +65,7 @@ import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.Servlets;
@@ -130,11 +131,11 @@ public class CCApplication extends BaseCCApplication {
                 .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, 
ccServiceCtx);
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
-        GlobalRecoveryManager.instantiate(ccServiceCtx, getHcc(), 
componentProvider);
+        GlobalRecoveryManager globalRecoveryManager = 
createGlobalRecoveryManager();
         statementExecutorCtx = new StatementExecutorContext();
         appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, resourceIdManager,
-                () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance(), ftStrategy,
-                new ActiveLifecycleListener(), componentProvider);
+                () -> MetadataManager.INSTANCE, globalRecoveryManager, 
ftStrategy, new ActiveNotificationHandler(),
+                componentProvider, new MetadataLockManager());
         ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
@@ -147,18 +148,22 @@ public class CCApplication extends BaseCCApplication {
         
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties);
-        
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveLifecycleListener());
+        
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
 
         // create event loop groups
         webManager = new WebManager();
         configureServers();
         webManager.start();
-        
ClusterManagerProvider.getClusterManager().registerSubscriber(GlobalRecoveryManager.instance());
+        
ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager);
         ccServiceCtx.addClusterLifecycleListener(new 
ClusterLifecycleListener(appCtx));
 
         jobCapacityController = new 
JobCapacityController(controllerService.getResourceManager());
     }
 
+    protected GlobalRecoveryManager createGlobalRecoveryManager() throws 
Exception {
+        return new GlobalRecoveryManager(ccServiceCtx, getHcc(), 
componentProvider);
+    }
+
     @Override
     protected void configureLoggingLevel(Level level) {
         super.configureLoggingLevel(level);
@@ -178,7 +183,7 @@ public class CCApplication extends BaseCCApplication {
 
     @Override
     public void stop() throws Exception {
-        ((ActiveLifecycleListener) appCtx.getActiveLifecycleListener()).stop();
+        ((ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler()).stop();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopping Asterix cluster controller");
         }
@@ -288,9 +293,8 @@ public class CCApplication extends BaseCCApplication {
                         ccExtensionManager.getCompilationProvider(SQLPP), 
getStatementExecutorFactory(),
                         componentProvider);
             case Servlets.QUERY_AQL:
-                return new QueryServiceServlet(ctx, paths, appCtx, AQL,
-                        ccExtensionManager.getCompilationProvider(AQL), 
getStatementExecutorFactory(),
-                        componentProvider);
+                return new QueryServiceServlet(ctx, paths, appCtx, AQL, 
ccExtensionManager.getCompilationProvider(AQL),
+                        getStatementExecutorFactory(), componentProvider);
             case Servlets.CONNECTOR:
                 return new ConnectorApiServlet(ctx, paths, appCtx);
             case Servlets.REBALANCE:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 2a1fd0b..3209557 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -43,33 +43,29 @@ import 
org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class GlobalRecoveryManager implements IGlobalRecoveryManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(GlobalRecoveryManager.class.getName());
-    private static GlobalRecoveryManager instance;
-    private static ClusterState state;
-    private final IStorageComponentProvider componentProvider;
-    private final ICCServiceContext ccServiceCtx;
-    private IHyracksClientConnection hcc;
+    protected final IStorageComponentProvider componentProvider;
+    protected final ICCServiceContext serviceCtx;
+    protected IHyracksClientConnection hcc;
+    protected volatile boolean recoveryCompleted;
 
-    private GlobalRecoveryManager(ICCServiceContext ccServiceCtx, 
IHyracksClientConnection hcc,
-                                  IStorageComponentProvider componentProvider) 
{
-        setState(ClusterState.UNUSABLE);
-        this.ccServiceCtx = ccServiceCtx;
+    public GlobalRecoveryManager(ICCServiceContext serviceCtx, 
IHyracksClientConnection hcc,
+            IStorageComponentProvider componentProvider) {
+        this.serviceCtx = serviceCtx;
         this.hcc = hcc;
         this.componentProvider = componentProvider;
     }
 
     @Override
     public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> 
deadNodeIds) {
-        setState(ClusterStateManager.INSTANCE.getState());
-        ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(false);
         return Collections.emptySet();
     }
 
@@ -85,54 +81,59 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
     }
 
     @Override
-    public void startGlobalRecovery(ICcApplicationContext appCtx) {
-        // perform global recovery if state changed to active
-        final ClusterState newState = ClusterStateManager.INSTANCE.getState();
-        boolean needToRecover = !newState.equals(state) && (newState == 
ClusterState.ACTIVE);
-        if (needToRecover) {
-            setState(newState);
-            ccServiceCtx.getControllerService().getExecutor().submit(() -> {
-                LOGGER.info("Starting Global Recovery");
-                MetadataTransactionContext mdTxnCtx = null;
+    public void startGlobalRecovery(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        if (!recoveryCompleted) {
+            recover(appCtx);
+        }
+    }
+
+    protected void recover(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        LOGGER.info("Starting Global Recovery");
+        MetadataTransactionContext mdTxnCtx = null;
+        try {
+            MetadataManager.INSTANCE.init();
+            // Loop over datasets
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            for (Dataverse dataverse : 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
+                mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            // This needs to be fixed <-- Needs to shutdown the system -->
+            /*
+             * Note: Throwing this illegal state exception will terminate this 
thread
+             * and feeds listeners will not be notified.
+             */
+            LOGGER.log(Level.SEVERE, "Global recovery was not completed 
successfully: ", e);
+            if (mdTxnCtx != null) {
                 try {
-                    MetadataManager.INSTANCE.init();
-                    // Loop over datasets
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    for (Dataverse dataverse : 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
-                        mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
-                    }
-                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                } catch (Exception e) {
-                    // This needs to be fixed <-- Needs to shutdown the system 
-->
-                    /*
-                     * Note: Throwing this illegal state exception will 
terminate this thread
-                     * and feeds listeners will not be notified.
-                     */
-                    LOGGER.log(Level.SEVERE, "Global recovery was not 
completed successfully: ", e);
-                    if (mdTxnCtx != null) {
-                        try {
-                            
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                        } catch (Exception e1) {
-                            LOGGER.log(Level.SEVERE, "Exception in aborting", 
e1);
-                            e1.addSuppressed(e);
-                            throw new IllegalStateException(e1);
-                        }
-                    }
+                    MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                } catch (Exception e1) {
+                    LOGGER.log(Level.SEVERE, "Exception in aborting", e1);
+                    e1.addSuppressed(e);
+                    throw new IllegalStateException(e);
                 }
-                ClusterStateManager.INSTANCE.setGlobalRecoveryCompleted(true);
-                LOGGER.info("Global Recovery Completed");
-            });
+            }
+            throw HyracksDataException.create(e);
+        }
+        recoveryCompleted = true;
+        LOGGER.info("Global Recovery Completed");
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState newState) {
+        if (newState != ClusterState.ACTIVE) {
+            recoveryCompleted = false;
         }
     }
 
     private MetadataTransactionContext recoverDataset(ICcApplicationContext 
appCtx, MetadataTransactionContext mdTxnCtx,
-                                                      Dataverse dataverse)
-            throws Exception {
+            Dataverse dataverse) throws Exception {
         if 
(!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME))
 {
-            MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
dataverse, componentProvider);
+            MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
dataverse);
             try {
-                List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
-                        dataverse.getDataverseName());
+                List<Dataset> datasets =
+                        
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, 
dataverse.getDataverseName());
                 for (Dataset dataset : datasets) {
                     if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
                         // External dataset
@@ -144,8 +145,8 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
                         TransactionState datasetState = dsd.getState();
                         if (!indexes.isEmpty()) {
                             if (datasetState == TransactionState.BEGIN) {
-                                List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
-                                        dataset);
+                                List<ExternalFile> files =
+                                        
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
                                 // if persumed abort, roll backward
                                 // 1. delete all pending files
                                 for (ExternalFile file : files) {
@@ -156,8 +157,8 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
                             }
                             // 2. clean artifacts in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec = 
ExternalIndexingOperations.buildAbortOp(dataset, indexes,
-                                    metadataProvider);
+                            JobSpecification jobSpec =
+                                    
ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 3. correct the dataset state
                             ((ExternalDatasetDetails) 
dataset.getDatasetDetails()).setState(TransactionState.COMMIT);
@@ -165,13 +166,13 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
                             
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                             mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
                         } else if (datasetState == 
TransactionState.READY_TO_COMMIT) {
-                            List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
-                                    dataset);
+                            List<ExternalFile> files =
+                                    
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
                             // if ready to commit, roll forward
                             // 1. commit indexes in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec = 
ExternalIndexingOperations.buildRecoverOp(dataset, indexes,
-                                    metadataProvider);
+                            JobSpecification jobSpec =
+                                    
ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 2. add pending files in metadata
                             for (ExternalFile file : files) {
@@ -213,20 +214,11 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
                 metadataProvider.getLocks().unlock();
             }
         }
-
         return mdTxnCtx;
     }
 
-    public static GlobalRecoveryManager instance() {
-        return instance;
-    }
-
-    public static synchronized void instantiate(ICCServiceContext 
ccServiceCtx, IHyracksClientConnection hcc,
-                                                IStorageComponentProvider 
componentProvider) {
-        instance = new GlobalRecoveryManager(ccServiceCtx, hcc, 
componentProvider);
-    }
-
-    public static synchronized void setState(ClusterState state) {
-        GlobalRecoveryManager.state = state;
+    @Override
+    public boolean isRecoveryCompleted() {
+        return recoveryCompleted;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 1ed37e0..4174685 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -28,18 +28,19 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.IntStream;
 
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.MetadataManagerUtil;
+import org.apache.asterix.metadata.api.IActiveEntityController;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.lock.LockList;
-import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
@@ -119,7 +120,6 @@ public class RebalanceUtil {
             // The target dataset for rebalance.
             targetDataset = 
sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
 
-
             // Rebalances the source dataset into the target dataset.
             rebalance(sourceDataset, targetDataset, metadataProvider, hcc, 
datasetRebalanceCallback);
 
@@ -128,8 +128,6 @@ public class RebalanceUtil {
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
-        } finally {
-            metadataProvider.getLocks().reset();
         }
 
         // Up to this point, since the bulk part of a rebalance operation is 
done,
@@ -143,7 +141,7 @@ public class RebalanceUtil {
             // Executes the 2nd Metadata transaction for switching the 
metadata entity.
             // It detaches the source dataset and attaches the target dataset 
to metadata's point of view.
             runMetadataTransaction(metadataProvider,
-                () -> rebalanceSwitch(sourceDataset, targetDataset, 
metadataProvider, hcc));
+                    () -> rebalanceSwitch(sourceDataset, targetDataset, 
metadataProvider, hcc));
             // Executes the 3rd Metadata transaction to drop the source 
dataset files and the node group for
             // the source dataset.
             runMetadataTransaction(metadataProvider, () -> 
dropSourceDataset(sourceDataset, metadataProvider, hcc));
@@ -188,8 +186,6 @@ public class RebalanceUtil {
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
-        } finally {
-            metadataProvider.getLocks().reset();
         }
     }
 
@@ -219,22 +215,25 @@ public class RebalanceUtil {
     private static void rebalanceSwitch(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
-
-        // Acquires the metadata write lock for the source/target dataset.
-        writeLockDataset(metadataProvider.getLocks(), source);
-
-        Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, 
source.getDataverseName(),
-                source.getDatasetName());
-
-        if (sourceDataset == null) {
-            // The dataset has already been dropped.
-            // In this case, we should drop the generated target dataset files.
-            dropDatasetFiles(target, metadataProvider, hcc);
-            return;
+        // upgrade lock
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveNotificationHandler activeNotificationHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+        lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), 
DatasetUtil.getFullyQualifiedName(target));
+        try {
+            // Updates the dataset entry in the metadata storage
+            MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
+            for (IActiveEntityEventsListener listener : 
activeNotificationHandler.getEventListeners()) {
+                if (listener instanceof IActiveEntityController) {
+                    IActiveEntityController controller = 
(IActiveEntityController) listener;
+                    controller.replace(target);
+                }
+            }
+        } finally {
+            
lockManager.downgradeDatasetLockToExclusiveModify(metadataProvider.getLocks(),
+                    DatasetUtil.getFullyQualifiedName(target));
         }
-
-        // Updates the dataset entry in the metadata storage
-        MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
     }
 
     // Drops the source dataset.
@@ -245,12 +244,12 @@ public class RebalanceUtil {
         dropDatasetFiles(source, metadataProvider, hcc);
 
         // Drops the metadata entry of source dataset's node group.
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
         String sourceNodeGroup = source.getNodeGroupName();
-        
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 sourceNodeGroup);
+        
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 sourceNodeGroup);
         
MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(),
 sourceNodeGroup, true);
     }
 
-
     // Creates the files for the rebalance target dataset.
     private static void createRebalanceTarget(Dataset target, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
@@ -301,8 +300,8 @@ public class RebalanceUtil {
         int numKeys = source.getPrimaryKeys().size();
         int numValues = source.hasMetaPart() ? 2 : 1;
         int[] fieldPermutation = IntStream.range(0, numKeys + 
numValues).toArray();
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
upsertOpAndConstraints = DatasetUtil
-                .createPrimaryIndexUpsertOp(spec, metadataProvider, target,
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
upsertOpAndConstraints =
+                DatasetUtil.createPrimaryIndexUpsertOp(spec, metadataProvider, 
target,
                         source.getPrimaryRecordDescriptor(metadataProvider), 
fieldPermutation,
                         MissingWriterFactory.INSTANCE);
         IOperatorDescriptor upsertOp = upsertOpAndConstraints.first;
@@ -334,13 +333,6 @@ public class RebalanceUtil {
         }
     }
 
-    // Acquires a read lock for the dataverse and a write lock for the 
dataset, in order to populate the dataset.
-    private static void writeLockDataset(LockList locks, Dataset dataset) 
throws AsterixException {
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, 
dataset.getDataverseName());
-        MetadataLockManager.INSTANCE.acquireDatasetWriteLock(locks,
-                dataset.getDataverseName() + "." + dataset.getDatasetName());
-    }
-
     // Creates and loads all secondary indexes for the rebalance target 
dataset.
     private static void createAndLoadSecondaryIndexesForTarget(Dataset source, 
Dataset target,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc) 
throws Exception {
@@ -349,13 +341,13 @@ public class RebalanceUtil {
                 continue;
             }
             // Creates the secondary index.
-            JobSpecification indexCreationJobSpec = 
IndexUtil.buildSecondaryIndexCreationJobSpec(target, index,
-                    metadataProvider);
+            JobSpecification indexCreationJobSpec =
+                    IndexUtil.buildSecondaryIndexCreationJobSpec(target, 
index, metadataProvider);
             JobUtils.runJob(hcc, indexCreationJobSpec, true);
 
             // Loads the secondary index.
-            JobSpecification indexLoadingJobSpec = 
IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index,
-                    metadataProvider);
+            JobSpecification indexLoadingJobSpec =
+                    IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, 
metadataProvider);
             JobUtils.runJob(hcc, indexLoadingJobSpec, true);
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 3d6543b..5abbe40 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -48,10 +47,8 @@ import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -183,8 +180,7 @@ public class ConnectorApiServletTest {
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         // Retrieves file splits of the dataset.
         MetadataProvider metadataProvider = new MetadataProvider(
-                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), null,
-                new StorageComponentProvider());
+                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), null);
         try {
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 99892c5..c1421c5 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -227,7 +227,7 @@ public class TestNodeController {
         Index index = primaryIndexInfo.getIndex();
         CcApplicationContext appCtx =
                 (CcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
-        MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse, 
storageComponentProvider);
+        MetadataProvider mdProvider = new MetadataProvider(appCtx, dataverse);
         try {
             return dataset.getResourceFactory(mdProvider, index, 
primaryIndexInfo.recordType, primaryIndexInfo.metaType,
                     primaryIndexInfo.mergePolicyFactory, 
primaryIndexInfo.mergePolicyProperties);
@@ -246,8 +246,7 @@ public class TestNodeController {
         Dataverse dataverse = new Dataverse(dataset.getDataverseName(), 
NonTaggedDataFormat.class.getName(),
                 MetadataUtil.PENDING_NO_OP);
         MetadataProvider mdProvider = new MetadataProvider(
-                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse,
-                storageComponentProvider);
+                (ICcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
         try {
             IResourceFactory resourceFactory = 
dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
                     recordType, metaType, mergePolicyFactory, 
mergePolicyProperties);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
new file mode 100644
index 0000000..71cb038
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.active;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+abstract class Action {
+    boolean done = false;
+    HyracksDataException failure;
+
+    void execute(MetadataProvider actorMdProvider) {
+        try {
+            doExecute(actorMdProvider);
+        } catch (Exception e) {
+            failure = HyracksDataException.create(e);
+        }
+        synchronized (this) {
+            done = true;
+            notifyAll();
+        }
+    }
+
+    protected abstract void doExecute(MetadataProvider mdProvider) throws 
Exception;
+
+    boolean hasFailed() {
+        return failure != null;
+    }
+
+    HyracksDataException getFailure() {
+        return failure;
+    }
+
+    synchronized void sync() throws InterruptedException {
+        while (!done) {
+            wait();
+        }
+    }
+
+    boolean isDone() {
+        return done;
+    }
+}
\ No newline at end of file

Reply via email to