Repository: asterixdb Updated Branches: refs/heads/master 3be86c582 -> fa1b5e83d
[NO ISSUE][ING] Make resume attempt on the same suspend/resume thread - user model changes: no - storage format changes: no - interface changes: no Details: - Previously, the resume attempt happens in a different thread but uses the same metadata provider used by the suspension thread. Additional locks gets acquired during compilation and added to the locklist of the metadata provider. When the locks are released, we get illegal state exception due to releasing locks acquired by other threads. Change-Id: Icc801923b167862286a5104b199cdc43e76096c8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2104 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fa1b5e83 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fa1b5e83 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fa1b5e83 Branch: refs/heads/master Commit: fa1b5e83d993011ff03297fdff4d843df2c484d2 Parents: 3be86c5 Author: Abdullah Alamoudi <[email protected]> Authored: Wed Oct 25 14:09:04 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Wed Oct 25 16:15:40 2017 -0700 ---------------------------------------------------------------------- .../app/active/ActiveEntityEventsListener.java | 23 +++++--------------- .../apache/asterix/app/active/RecoveryTask.java | 12 +++++----- .../asterix/test/active/TestEventsListener.java | 11 +++++++--- 3 files changed, 19 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fa1b5e83/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index e3c1fed..bae04d5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -388,14 +388,11 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl protected abstract Void doStop(MetadataProvider metadataProvider) throws HyracksDataException; - protected abstract Void doSuspend(MetadataProvider metadataProvider) - throws HyracksDataException; + protected abstract Void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException; - protected abstract void doResume(MetadataProvider metadataProvider) - throws HyracksDataException; + protected abstract void doResume(MetadataProvider metadataProvider) throws HyracksDataException; - protected abstract void setRunning(MetadataProvider metadataProvider, boolean running) - throws HyracksDataException; + protected abstract void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException; @Override public synchronized void stop(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { @@ -505,21 +502,11 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl return; } setState(ActivityState.RESUMING); - WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, - EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED)); rt = new RecoveryTask(appCtx, this, retryPolicyFactory); - metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor() - .submit(() -> rt.resumeOrRecover(metadataProvider)); try { - subscriber.sync(); - if (subscriber.getFailure() != null) { - LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId, - subscriber.getFailure()); - } - } catch (InterruptedException e) { + rt.resumeOrRecover(metadataProvider); + } catch (Exception e) { LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId, e); - Thread.currentThread().interrupt(); - throw HyracksDataException.create(e); } } finally { suspended = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fa1b5e83/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java index 73439ce..29b38ba 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -79,15 +79,14 @@ public class RecoveryTask { cancelRecovery = true; } - protected Void resumeOrRecover(MetadataProvider metadataProvider) - throws HyracksDataException, AlgebricksException, InterruptedException { + protected void resumeOrRecover(MetadataProvider metadataProvider) throws HyracksDataException { try { synchronized (listener) { listener.doResume(metadataProvider); listener.setState(ActivityState.RUNNING); } } catch (Exception e) { - LOGGER.log(Level.WARNING, "First attempt to resume " + listener.getEntityId() + " Failed", e); + LOGGER.log(Level.WARNING, "Attempt to resume " + listener.getEntityId() + " Failed", e); synchronized (listener) { if (listener.getState() == ActivityState.RESUMING) { // This will be the case if compilation failure @@ -103,11 +102,12 @@ public class RecoveryTask { } } } else { - IRetryPolicy policy = retryPolicyFactory.create(listener); - doRecover(policy); + LOGGER.log(Level.WARNING, "Submitting recovery task for " + listener.getEntityId()); + metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor() + .submit(() -> doRecover(retryPolicyFactory.create(listener))); } + throw e; } - return null; } protected Void doRecover(IRetryPolicy policy) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fa1b5e83/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java index 90880aa..5199b1c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java @@ -30,13 +30,13 @@ import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.metadata.LockList; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -107,6 +107,12 @@ public class TestEventsListener extends ActiveEntityEventsListener { @Override protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException { step(onStart); + try { + metadataProvider.getApplicationContext().getMetadataLockManager() + .acquireDatasetReadLock(metadataProvider.getLocks(), "Default.type"); + } catch (AsterixException e) { + throw HyracksDataException.create(e); + } failCompile(onStart); JobId jobId = jobIdFactory.create(); Action startJob = clusterController.startActiveJob(jobId, entityId); @@ -177,8 +183,7 @@ public class TestEventsListener extends ActiveEntityEventsListener { } @Override - protected void setRunning(MetadataProvider metadataProvider, boolean running) - throws HyracksDataException { + protected void setRunning(MetadataProvider metadataProvider, boolean running) throws HyracksDataException { try { IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); LockList locks = metadataProvider.getLocks();
