This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 55ad6acc39b5265487ab340b23cf0ea43a2913a1 Author: Murtadha Hubail <[email protected]> AuthorDate: Sat Apr 10 20:28:54 2021 +0300 [NO ISSUE][MTD] Let active listeners acquire suspend locks - user model changes: no - storage format changes: no - interface changes: no Details: - Let each active listener acquire its suspend locks. Change-Id: I38cf6e9107ee5ce6a1084dca13708ea0153e9e56 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10985 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../app/active/ActiveEntityEventsListener.java | 26 ++++++++++++++++++ .../app/active/ActiveNotificationHandler.java | 32 ++++------------------ 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 5f7d65e..cc4b25f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -44,12 +44,14 @@ import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.ActiveStatsRequestMessage; import org.apache.asterix.active.message.StopRuntimeParameters; +import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.metadata.api.IActiveEntityController; @@ -58,6 +60,7 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.translator.IStatementExecutor; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -704,6 +707,29 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl return new RecoveryTask(appCtx, this, retryPolicyFactory); } + public void acquireSuspendLocks(MetadataProvider metadataProvider, Dataset targetDataset) + throws AlgebricksException { + // write lock the listener + // exclusive lock all the datasets (except the target dataset) + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + DataverseName dataverseName = entityId.getDataverseName(); + String entityName = entityId.getEntityName(); + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName); + acquireSuspendDatasetsLocks(metadataProvider, lockManager, targetDataset); + } + + protected void acquireSuspendDatasetsLocks(MetadataProvider metadataProvider, IMetadataLockManager lockManager, + Dataset targetDataset) throws AlgebricksException { + for (Dataset dataset : getDatasets()) { + if (targetDataset != null && targetDataset.equals(dataset)) { + // DDL operation already acquired the proper lock for the operation + continue; + } + lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataset.getDataverseName(), + dataset.getDatasetName()); + } + } + @Override public String toString() { return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\"," diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 8b74e07..0d63bca 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveEvent.Kind; @@ -30,10 +29,8 @@ import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.active.IActiveNotificationHandler; import org.apache.asterix.active.message.ActivePartitionMessage; -import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.commons.lang3.tuple.Pair; @@ -278,30 +275,13 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider, Dataset targetDataset) { try { - // write lock the listener - // exclusive lock all the datasets (except the target dataset) - IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); - DataverseName dataverseName = listener.getEntityId().getDataverseName(); - String entityName = listener.getEntityId().getEntityName(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Suspending " + listener.getEntityId()); - } - LOGGER.log(level, "Acquiring locks"); - lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName); - Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); - for (Dataset dataset : datasets) { - if (targetDataset != null && targetDataset.equals(dataset)) { - // DDL operation already acquired the proper lock for the operation - continue; - } - lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), - dataset.getDataverseName(), dataset.getDatasetName()); - } - LOGGER.log(level, "locks acquired"); + EntityId entityId = listener.getEntityId(); + LOGGER.log(level, "Suspending {}", entityId); + LOGGER.log(level, "Acquiring locks for {}", entityId); + ((ActiveEntityEventsListener) listener).acquireSuspendLocks(metadataProvider, targetDataset); + LOGGER.log(level, "locks acquired for {}", entityId); ((ActiveEntityEventsListener) listener).suspend(metadataProvider); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, listener.getEntityId() + " suspended"); - } + LOGGER.log(level, "{} suspended", entityId); } catch (Throwable th) { // NOSONAR must halt in case of any failure LOGGER.error("Suspend active failed", th); ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);
