This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 2e1d59e3d6d9967a1e2d836da6b19109a4ff4698 Author: Murtadha Hubail <[email protected]> AuthorDate: Fri May 19 14:56:48 2023 +0300 [ASTERIXDB-3189][*DB] Allow queries to be canceled during compilation - user model changes: no - storage format changes: no - interface changes: no Details: - Allow queries to be canceled during compilation. - Allow queries to be interrupted while waiting for dataset upgrade lock which could potentially be held by the rebalance. - Log before and after rebalance acquires dataset upgrade lock. Change-Id: I6031f36df583ed790a0ec89885071c27ae8efdb9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17543 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../asterix/app/translator/QueryTranslator.java | 25 ++++++++++++++++------ .../org/apache/asterix/utils/RebalanceUtil.java | 2 ++ .../asterix/common/metadata/IMetadataLock.java | 2 +- .../apache/asterix/common/metadata/LockList.java | 7 +++++- .../apache/asterix/metadata/lock/DatasetLock.java | 20 ++++++++++++----- 5 files changed, 42 insertions(+), 14 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 1729b5037d..298468c3fb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4176,9 +4176,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } private interface IMetadataLocker { - void lock() throws AlgebricksException; + void lock() throws HyracksDataException, AlgebricksException, InterruptedException; - void unlock() throws AlgebricksException; + void unlock() throws HyracksDataException, AlgebricksException; } private interface IResultPrinter { @@ -4193,10 +4193,19 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { + final IRequestTracker requestTracker = appCtx.getRequestTracker(); + final ClientRequest clientRequest = + (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); final IMetadataLocker locker = new IMetadataLocker() { @Override - public void lock() { - compilationLock.readLock().lock(); + public void lock() throws RuntimeDataException, InterruptedException { + try { + compilationLock.readLock().lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ensureNotCancelled(clientRequest); + throw e; + } } @Override @@ -4343,18 +4352,20 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); + if (cancellable) { + clientRequest.markCancellable(); + } locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); if (jobSpec == null) { return; } - if (cancellable) { - clientRequest.markCancellable(); - } final SchedulableClientRequest schedulableRequest = SchedulableClientRequest.of(clientRequest, requestParameters, metadataProvider, jobSpec); appCtx.getReceptionist().ensureSchedulable(schedulableRequest); + // ensure request not cancelled before running job + ensureNotCancelled(clientRequest); final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); clientRequest.setJobId(jobId); if (jId != null) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index b0dc16238b..e2d8e015f9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -243,8 +243,10 @@ public class RebalanceUtil { ActiveNotificationHandler activeNotificationHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); IMetadataLockManager lockManager = appCtx.getMetadataLockManager(); + LOGGER.debug("attempting to acquire dataset {} upgrade lock", source.getDatasetName()); lockManager.upgradeDatasetLockToWrite(metadataProvider.getLocks(), source.getDataverseName(), source.getDatasetName()); + LOGGER.debug("acquired dataset {} upgrade lock", source.getDatasetName()); LOGGER.info("Updating dataset {} node group from {} to {}", source.getDatasetName(), source.getNodeGroupName(), target.getNodeGroupName()); try { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java index 1f77aa0724..d491ea456f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLock.java @@ -63,7 +63,7 @@ public interface IMetadataLock { * @param mode * lock mode */ - void lock(IMetadataLock.Mode mode); + void lock(IMetadataLock.Mode mode) throws InterruptedException; /** * Release a lock diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java index 43a1849665..06a317ed26 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/LockList.java @@ -49,7 +49,12 @@ public class LockList { if (isContained(mode, lock)) { return; } - lock.lock(mode); + try { + lock.lock(mode); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AsterixException(e); + } indexes.put(lock.getKey(), locks.size()); locks.add(MutablePair.of(lock, mode)); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java index e0a67256ab..41d0e97827 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java @@ -79,8 +79,8 @@ public class DatasetLock implements IMetadataLock { lock.writeLock().unlock(); } - private void upgradeReadLock() { - upgradeLock.readLock().lock(); + private void upgradeReadLock() throws InterruptedException { + upgradeLock.readLock().lockInterruptibly(); } private void modifyReadLock() { @@ -185,7 +185,7 @@ public class DatasetLock implements IMetadataLock { } @Override - public void lock(IMetadataLock.Mode mode) { + public void lock(IMetadataLock.Mode mode) throws InterruptedException { switch (mode) { case INDEX_BUILD: readLock(); @@ -203,8 +203,7 @@ public class DatasetLock implements IMetadataLock { writeLock(); break; case READ: - readLock(); - upgradeReadLock(); + atomicReadLock(); break; default: throw new IllegalStateException("locking mode " + mode + " is not supported"); @@ -264,6 +263,17 @@ public class DatasetLock implements IMetadataLock { return Objects.equals(key, ((DatasetLock) o).key); } + private void atomicReadLock() throws InterruptedException { + readLock(); + try { + upgradeReadLock(); + } catch (InterruptedException e) { + readUnlock(); + Thread.currentThread().interrupt(); + throw e; + } + } + @Override public String toString() { return String.valueOf(key);
