This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f30ce8129b36292549ae1e0c6d5dbf2f66423b32
Author: Michael Blow <[email protected]>
AuthorDate: Tue Nov 5 08:13:18 2024 -0500

    [NO ISSUE][*DB][RT] Retry recovery task on any interleaving suspensions
    
    Upon encountering a non-retryable active failure, retry the recovery
    task in the event an interleaving suspend/resume occurred, since that
    may potentially have addressed the failure.
    
    Ext-ref: MB-64109
    Change-Id: I58546bf69a44465c50001fc5ae90b2e9d7cae176
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19046
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/app/active/ActiveEntityEventsListener.java |  6 ++++++
 .../org/apache/asterix/app/active/RecoveryTask.java    | 18 ++++++++++++++----
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 7b253f1544..f32c730515 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -105,6 +105,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     protected int numDeRegistered;
     protected volatile RecoveryTask rt;
     protected volatile boolean suspended = false;
+    private long suspendCount;
     // failures
     protected Exception jobFailure;
     protected Exception resumeFailure;
@@ -265,6 +266,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         return jobId;
     }
 
+    public long getSuspendCount() {
+        return suspendCount;
+    }
+
     @Override
     public String getStats() {
         return stats;
@@ -568,6 +573,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
             LOGGER.log(level, "{} waiting for ongoing activities", jobId);
             waitForNonTransitionState();
             LOGGER.log(level, "{} proceeding with suspension. current state is 
{}", jobId, state);
+            suspendCount++;
             if (state == ActivityState.STOPPED) {
                 suspended = true;
                 return;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index e4e7b36442..b2e6204838 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -121,6 +121,7 @@ public class RecoveryTask {
     protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, 
InterruptedException {
         LOGGER.log(level, "Actual Recovery task has started");
         Exception failure;
+        long prevSuspendCount;
         do {
             synchronized (listener) {
                 while (!cancelRecovery && !canStartRecovery()) {
@@ -153,6 +154,7 @@ public class RecoveryTask {
                 listener.setState(ActivityState.TEMPORARILY_FAILED);
                 failure = e;
             } finally {
+                prevSuspendCount = listener.getSuspendCount();
                 releaseRecoveryLocks(metadataProvider);
             }
         } while (policy.retry(failure));
@@ -173,20 +175,28 @@ public class RecoveryTask {
         }
         IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
         IMetadataLockUtil lockUtil = 
metadataProvider.getApplicationContext().getMetadataLockUtil();
+        boolean retryRecovery = false;
         try {
             acquirePostRecoveryLocks(lockManager, lockUtil);
             synchronized (listener) {
                 if (!cancelRecovery && listener.getState() == 
ActivityState.TEMPORARILY_FAILED) {
-                    LOGGER.warn("Recovery for {} permanently failed", 
listener.getEntityId());
-                    listener.setState(ActivityState.STOPPED);
-                    listener.setRunning(metadataProvider, false);
+                    if (prevSuspendCount == listener.getSuspendCount()) {
+                        LOGGER.warn("Recovery for {} permanently failed", 
listener.getEntityId());
+                        listener.setState(ActivityState.STOPPED);
+                        listener.setRunning(metadataProvider, false);
+                    } else {
+                        LOGGER.log(level,
+                                "Retrying recovery on non-retryable failure 
due to interleaving suspend/resume for {}",
+                                listener.getEntityId());
+                        retryRecovery = true;
+                    }
                 }
                 listener.notifyAll();
             }
         } finally {
             releasePostRecoveryLocks();
         }
-        return null;
+        return retryRecovery ? doRecover(policy) : null;
     }
 
     protected void acquireRecoveryLocks(IMetadataLockManager lockManager, 
IMetadataLockUtil lockUtil)

Reply via email to