This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 55ad6acc39b5265487ab340b23cf0ea43a2913a1
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sat Apr 10 20:28:54 2021 +0300

    [NO ISSUE][MTD] Let active listeners acquire suspend locks
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Let each active listener acquire its suspend locks.
    
    Change-Id: I38cf6e9107ee5ce6a1084dca13708ea0153e9e56
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10985
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../app/active/ActiveEntityEventsListener.java     | 26 ++++++++++++++++++
 .../app/active/ActiveNotificationHandler.java      | 32 ++++------------------
 2 files changed, 32 insertions(+), 26 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 5f7d65e..cc4b25f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -44,12 +44,14 @@ import 
org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.active.message.StopRuntimeParameters;
+import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.metadata.api.IActiveEntityController;
@@ -58,6 +60,7 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
@@ -704,6 +707,29 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         return new RecoveryTask(appCtx, this, retryPolicyFactory);
     }
 
+    public void acquireSuspendLocks(MetadataProvider metadataProvider, Dataset 
targetDataset)
+            throws AlgebricksException {
+        // write lock the listener
+        // exclusive lock all the datasets (except the target dataset)
+        IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+        DataverseName dataverseName = entityId.getDataverseName();
+        String entityName = entityId.getEntityName();
+        lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), 
dataverseName, entityName);
+        acquireSuspendDatasetsLocks(metadataProvider, lockManager, 
targetDataset);
+    }
+
+    protected void acquireSuspendDatasetsLocks(MetadataProvider 
metadataProvider, IMetadataLockManager lockManager,
+            Dataset targetDataset) throws AlgebricksException {
+        for (Dataset dataset : getDatasets()) {
+            if (targetDataset != null && targetDataset.equals(dataset)) {
+                // DDL operation already acquired the proper lock for the 
operation
+                continue;
+            }
+            
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
 dataset.getDataverseName(),
+                    dataset.getDatasetName());
+        }
+    }
+
     @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\"," + 
"\"entityId\":\"" + entityId + "\","
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8b74e07..0d63bca 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -30,10 +29,8 @@ import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.tuple.Pair;
@@ -278,30 +275,13 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, 
MetadataProvider metadataProvider,
             Dataset targetDataset) {
         try {
-            // write lock the listener
-            // exclusive lock all the datasets (except the target dataset)
-            IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
-            DataverseName dataverseName = 
listener.getEntityId().getDataverseName();
-            String entityName = listener.getEntityId().getEntityName();
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Suspending " + listener.getEntityId());
-            }
-            LOGGER.log(level, "Acquiring locks");
-            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), 
dataverseName, entityName);
-            Set<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
-            for (Dataset dataset : datasets) {
-                if (targetDataset != null && targetDataset.equals(dataset)) {
-                    // DDL operation already acquired the proper lock for the 
operation
-                    continue;
-                }
-                
lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
-                        dataset.getDataverseName(), dataset.getDatasetName());
-            }
-            LOGGER.log(level, "locks acquired");
+            EntityId entityId = listener.getEntityId();
+            LOGGER.log(level, "Suspending {}", entityId);
+            LOGGER.log(level, "Acquiring locks for {}", entityId);
+            ((ActiveEntityEventsListener) 
listener).acquireSuspendLocks(metadataProvider, targetDataset);
+            LOGGER.log(level, "locks acquired for {}", entityId);
             ((ActiveEntityEventsListener) listener).suspend(metadataProvider);
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " suspended");
-            }
+            LOGGER.log(level, "{} suspended", entityId);
         } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Suspend active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);

Reply via email to