This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 2e1d59e3d6d9967a1e2d836da6b19109a4ff4698
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri May 19 14:56:48 2023 +0300

    [ASTERIXDB-3189][*DB] Allow queries to be canceled during compilation
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Allow queries to be canceled during compilation.
    - Allow queries to be interrupted while waiting for
      dataset upgrade lock which could potentially be
      held by the rebalance.
    - Log before and after rebalance acquires dataset
      upgrade lock.
    
    Change-Id: I6031f36df583ed790a0ec89885071c27ae8efdb9
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17543
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../asterix/app/translator/QueryTranslator.java    | 25 ++++++++++++++++------
 .../org/apache/asterix/utils/RebalanceUtil.java    |  2 ++
 .../asterix/common/metadata/IMetadataLock.java     |  2 +-
 .../apache/asterix/common/metadata/LockList.java   |  7 +++++-
 .../apache/asterix/metadata/lock/DatasetLock.java  | 20 ++++++++++++-----
 5 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1729b5037d..298468c3fb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4176,9 +4176,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     }
 
     private interface IMetadataLocker {
-        void lock() throws AlgebricksException;
+        void lock() throws HyracksDataException, AlgebricksException, 
InterruptedException;
 
-        void unlock() throws AlgebricksException;
+        void unlock() throws HyracksDataException, AlgebricksException;
     }
 
     private interface IResultPrinter {
@@ -4193,10 +4193,19 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             IResultSet resultSet, ResultDelivery resultDelivery, 
ResultMetadata outMetadata, Stats stats,
             IRequestParameters requestParameters, Map<String, IAObject> 
stmtParams, IStatementRewriter stmtRewriter)
             throws Exception {
+        final IRequestTracker requestTracker = appCtx.getRequestTracker();
+        final ClientRequest clientRequest =
+                (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
-            public void lock() {
-                compilationLock.readLock().lock();
+            public void lock() throws RuntimeDataException, 
InterruptedException {
+                try {
+                    compilationLock.readLock().lockInterruptibly();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    ensureNotCancelled(clientRequest);
+                    throw e;
+                }
             }
 
             @Override
@@ -4343,18 +4352,20 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) 
requestTracker.get(requestParameters.getRequestReference().getUuid());
+        if (cancellable) {
+            clientRequest.markCancellable();
+        }
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            if (cancellable) {
-                clientRequest.markCancellable();
-            }
             final SchedulableClientRequest schedulableRequest =
                     SchedulableClientRequest.of(clientRequest, 
requestParameters, metadataProvider, jobSpec);
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
+            // ensure request not cancelled before running job
+            ensureNotCancelled(clientRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             clientRequest.setJobId(jobId);
             if (jId != null) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index b0dc16238b..e2d8e015f9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -243,8 +243,10 @@ public class RebalanceUtil {
         ActiveNotificationHandler activeNotificationHandler =
                 (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         IMetadataLockManager lockManager = appCtx.getMetadataLockManager();
+        LOGGER.debug("attempting to acquire dataset {} upgrade lock", 
source.getDatasetName());
         lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), 
source.getDataverseName(),
                 source.getDatasetName());
+        LOGGER.debug("acquired dataset {} upgrade lock", 
source.getDatasetName());
         LOGGER.info("Updating dataset {} node group from {} to {}", 
source.getDatasetName(), source.getNodeGroupName(),
                 target.getNodeGroupName());
         try {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
index 1f77aa0724..d491ea456f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java
@@ -63,7 +63,7 @@ public interface IMetadataLock {
      * @param mode
      *            lock mode
      */
-    void lock(IMetadataLock.Mode mode);
+    void lock(IMetadataLock.Mode mode) throws InterruptedException;
 
     /**
      * Release a lock
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
index 43a1849665..06a317ed26 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java
@@ -49,7 +49,12 @@ public class LockList {
         if (isContained(mode, lock)) {
             return;
         }
-        lock.lock(mode);
+        try {
+            lock.lock(mode);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AsterixException(e);
+        }
         indexes.put(lock.getKey(), locks.size());
         locks.add(MutablePair.of(lock, mode));
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index e0a67256ab..41d0e97827 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -79,8 +79,8 @@ public class DatasetLock implements IMetadataLock {
         lock.writeLock().unlock();
     }
 
-    private void upgradeReadLock() {
-        upgradeLock.readLock().lock();
+    private void upgradeReadLock() throws InterruptedException {
+        upgradeLock.readLock().lockInterruptibly();
     }
 
     private void modifyReadLock() {
@@ -185,7 +185,7 @@ public class DatasetLock implements IMetadataLock {
     }
 
     @Override
-    public void lock(IMetadataLock.Mode mode) {
+    public void lock(IMetadataLock.Mode mode) throws InterruptedException {
         switch (mode) {
             case INDEX_BUILD:
                 readLock();
@@ -203,8 +203,7 @@ public class DatasetLock implements IMetadataLock {
                 writeLock();
                 break;
             case READ:
-                readLock();
-                upgradeReadLock();
+                atomicReadLock();
                 break;
             default:
                 throw new IllegalStateException("locking mode " + mode + " is 
not supported");
@@ -264,6 +263,17 @@ public class DatasetLock implements IMetadataLock {
         return Objects.equals(key, ((DatasetLock) o).key);
     }
 
+    private void atomicReadLock() throws InterruptedException {
+        readLock();
+        try {
+            upgradeReadLock();
+        } catch (InterruptedException e) {
+            readUnlock();
+            Thread.currentThread().interrupt();
+            throw e;
+        }
+    }
+
     @Override
     public String toString() {
         return String.valueOf(key);

Reply via email to