This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 950fce0d36 Avoids redundant work in manager for log recovery (#5974)
950fce0d36 is described below

commit 950fce0d36c3ada186c8e9e0f34ca68cc3396f94
Author: Keith Turner <[email protected]>
AuthorDate: Mon Nov 10 12:07:40 2025 -0500

    Avoids redundant work in manager for log recovery (#5974)
    
    fixes #5969
---
 .../accumulo/manager/TabletGroupWatcher.java       |   6 +-
 .../accumulo/manager/recovery/RecoveryManager.java | 145 ++++++++++++---------
 2 files changed, 87 insertions(+), 64 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 5604e69c70..215e0b8f74 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -84,6 +84,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
 import org.apache.accumulo.manager.Manager.TabletGoalState;
+import org.apache.accumulo.manager.recovery.RecoveryManager;
 import org.apache.accumulo.manager.state.MergeStats;
 import org.apache.accumulo.manager.state.TableCounts;
 import org.apache.accumulo.manager.state.TableStats;
@@ -227,6 +228,9 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
         TabletLists tLists = new TabletLists(manager, currentTServers);
 
+        RecoveryManager.RecoverySession recoverySession =
+            manager.recoveryManager.newRecoverySession();
+
         ManagerState managerState = manager.getManagerState();
         int[] counts = new int[TabletState.values().length];
         stats.begin();
@@ -321,7 +325,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
           if (goal == TabletGoalState.HOSTED) {
             if ((state != TabletState.HOSTED && !tls.walogs.isEmpty())
-                && manager.recoveryManager.recoverLogs(tls.extent, 
tls.walogs)) {
+                && recoverySession.recoverLogs(tls.walogs)) {
               continue;
             }
             switch (state) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index b1117eba3f..666970cc09 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.manager.Manager;
@@ -150,78 +149,98 @@ public class RecoveryManager {
     }
   }
 
-  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> 
walogs)
-      throws IOException {
-    boolean recoveryNeeded = false;
-
-    for (Collection<String> logs : walogs) {
-      for (String walog : logs) {
-
-        Path switchedWalog = VolumeUtil.switchVolume(walog, FileType.WAL,
-            manager.getContext().getVolumeReplacements());
-        if (switchedWalog != null) {
-          // replaces the volume used for sorting, but do not change entry in 
metadata table. When
-          // the tablet loads it will change the metadata table entry. If
-          // the tablet has the same replacement config, then it will find the 
sorted log.
-          log.info("Volume replaced {} -> {}", walog, switchedWalog);
-          walog = switchedWalog.toString();
-        }
+  // caches per log recovery decisions for its lifetime
+  public class RecoverySession {
 
-        String[] parts = walog.split("/");
-        String sortId = parts[parts.length - 1];
-        String filename = new Path(walog).toString();
-        String dest = RecoveryPath.getRecoveryPath(new 
Path(filename)).toString();
+    private HashMap<String,Boolean> needsRecovery = new HashMap<>();
 
-        boolean sortQueued;
-        synchronized (this) {
-          sortQueued = sortsQueued.contains(sortId);
-        }
+    public boolean recoverLogs(Collection<Collection<String>> walogs) throws 
IOException {
+      boolean recoveryNeeded = false;
 
-        if (sortQueued
-            && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + 
"/" + sortId)
-                == null) {
-          synchronized (this) {
-            sortsQueued.remove(sortId);
+      for (Collection<String> logs : walogs) {
+        for (String walog : logs) {
+          var logNeedsRecovery = needsRecovery.get(walog);
+          if (logNeedsRecovery == null) {
+            logNeedsRecovery = recoverLog(walog);
+            needsRecovery.put(walog, logNeedsRecovery);
           }
+          recoveryNeeded |= logNeedsRecovery;
         }
+      }
 
-        if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
-          synchronized (this) {
-            closeTasksQueued.remove(sortId);
-            recoveryDelay.remove(sortId);
-            sortsQueued.remove(sortId);
-          }
-          continue;
-        }
+      return recoveryNeeded;
+    }
+  }
 
-        recoveryNeeded = true;
-        synchronized (this) {
-          if (!closeTasksQueued.contains(sortId) && 
!sortsQueued.contains(sortId)) {
-            AccumuloConfiguration aconf = manager.getConfiguration();
-            @SuppressWarnings("deprecation")
-            LogCloser closer = Property.createInstanceFromPropertyName(aconf,
-                aconf.resolve(Property.MANAGER_WAL_CLOSER_IMPLEMENTATION,
-                    Property.MANAGER_WALOG_CLOSER_IMPLEMETATION),
-                LogCloser.class, new HadoopLogCloser());
-            Long delay = recoveryDelay.get(sortId);
-            if (delay == null) {
-              delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
-            } else {
-              delay = Math.min(2 * delay, 1000 * 60 * 5L);
-            }
-
-            log.info("Starting recovery of {} (in : {}s), tablet {} holds a 
reference", filename,
-                (delay / 1000), extent);
-
-            ScheduledFuture<?> future = executor.schedule(
-                new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
-            ThreadPools.watchNonCriticalScheduledTask(future);
-            closeTasksQueued.add(sortId);
-            recoveryDelay.put(sortId, delay);
-          }
+  public RecoverySession newRecoverySession() {
+    return new RecoverySession();
+  }
+
+  private boolean recoverLog(String walog) throws IOException {
+    boolean recoveryNeeded = false;
+
+    Path switchedWalog =
+        VolumeUtil.switchVolume(walog, FileType.WAL, 
manager.getContext().getVolumeReplacements());
+    if (switchedWalog != null) {
+      // replaces the volume used for sorting, but do not change entry in 
metadata table. When
+      // the tablet loads it will change the metadata table entry. If
+      // the tablet has the same replacement config, then it will find the 
sorted log.
+      log.info("Volume replaced {} -> {}", walog, switchedWalog);
+      walog = switchedWalog.toString();
+    }
+
+    String[] parts = walog.split("/");
+    String sortId = parts[parts.length - 1];
+    String filename = new Path(walog).toString();
+    String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString();
+
+    boolean sortQueued;
+    synchronized (this) {
+      sortQueued = sortsQueued.contains(sortId);
+    }
+
+    if (sortQueued
+        && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" 
+ sortId) == null) {
+      synchronized (this) {
+        sortsQueued.remove(sortId);
+      }
+    }
+
+    if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
+      synchronized (this) {
+        closeTasksQueued.remove(sortId);
+        recoveryDelay.remove(sortId);
+        sortsQueued.remove(sortId);
+      }
+      return false;
+    }
+
+    recoveryNeeded = true;
+    synchronized (this) {
+      if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) 
{
+        AccumuloConfiguration aconf = manager.getConfiguration();
+        @SuppressWarnings("deprecation")
+        LogCloser closer = Property.createInstanceFromPropertyName(aconf,
+            aconf.resolve(Property.MANAGER_WAL_CLOSER_IMPLEMENTATION,
+                Property.MANAGER_WALOG_CLOSER_IMPLEMETATION),
+            LogCloser.class, new HadoopLogCloser());
+        Long delay = recoveryDelay.get(sortId);
+        if (delay == null) {
+          delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
+        } else {
+          delay = Math.min(2 * delay, 1000 * 60 * 5L);
         }
+
+        log.info("Starting recovery of {} (in : {}s)", filename, (delay / 
1000));
+
+        ScheduledFuture<?> future = executor.schedule(
+            new LogSortTask(closer, filename, dest, sortId), delay, 
TimeUnit.MILLISECONDS);
+        ThreadPools.watchNonCriticalScheduledTask(future);
+        closeTasksQueued.add(sortId);
+        recoveryDelay.put(sortId, delay);
       }
     }
+
     return recoveryNeeded;
   }
 }

Reply via email to