This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 950fce0d36 Avoids redundant work in manager for log recovery (#5974)
950fce0d36 is described below
commit 950fce0d36c3ada186c8e9e0f34ca68cc3396f94
Author: Keith Turner <[email protected]>
AuthorDate: Mon Nov 10 12:07:40 2025 -0500
Avoids redundant work in manager for log recovery (#5974)
fixes #5969
---
.../accumulo/manager/TabletGroupWatcher.java | 6 +-
.../accumulo/manager/recovery/RecoveryManager.java | 145 ++++++++++++---------
2 files changed, 87 insertions(+), 64 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 5604e69c70..215e0b8f74 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -84,6 +84,7 @@ import
org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
import org.apache.accumulo.manager.Manager.TabletGoalState;
+import org.apache.accumulo.manager.recovery.RecoveryManager;
import org.apache.accumulo.manager.state.MergeStats;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.state.TableStats;
@@ -227,6 +228,9 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
TabletLists tLists = new TabletLists(manager, currentTServers);
+ RecoveryManager.RecoverySession recoverySession =
+ manager.recoveryManager.newRecoverySession();
+
ManagerState managerState = manager.getManagerState();
int[] counts = new int[TabletState.values().length];
stats.begin();
@@ -321,7 +325,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
if (goal == TabletGoalState.HOSTED) {
if ((state != TabletState.HOSTED && !tls.walogs.isEmpty())
- && manager.recoveryManager.recoverLogs(tls.extent,
tls.walogs)) {
+ && recoverySession.recoverLogs(tls.walogs)) {
continue;
}
switch (state) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index b1117eba3f..666970cc09 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.Manager;
@@ -150,78 +149,98 @@ public class RecoveryManager {
}
}
- public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>>
walogs)
- throws IOException {
- boolean recoveryNeeded = false;
-
- for (Collection<String> logs : walogs) {
- for (String walog : logs) {
-
- Path switchedWalog = VolumeUtil.switchVolume(walog, FileType.WAL,
- manager.getContext().getVolumeReplacements());
- if (switchedWalog != null) {
- // replaces the volume used for sorting, but do not change entry in
metadata table. When
- // the tablet loads it will change the metadata table entry. If
- // the tablet has the same replacement config, then it will find the
sorted log.
- log.info("Volume replaced {} -> {}", walog, switchedWalog);
- walog = switchedWalog.toString();
- }
+ // caches per log recovery decisions for its lifetime
+ public class RecoverySession {
- String[] parts = walog.split("/");
- String sortId = parts[parts.length - 1];
- String filename = new Path(walog).toString();
- String dest = RecoveryPath.getRecoveryPath(new
Path(filename)).toString();
+ private HashMap<String,Boolean> needsRecovery = new HashMap<>();
- boolean sortQueued;
- synchronized (this) {
- sortQueued = sortsQueued.contains(sortId);
- }
+ public boolean recoverLogs(Collection<Collection<String>> walogs) throws
IOException {
+ boolean recoveryNeeded = false;
- if (sortQueued
- && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY +
"/" + sortId)
- == null) {
- synchronized (this) {
- sortsQueued.remove(sortId);
+ for (Collection<String> logs : walogs) {
+ for (String walog : logs) {
+ var logNeedsRecovery = needsRecovery.get(walog);
+ if (logNeedsRecovery == null) {
+ logNeedsRecovery = recoverLog(walog);
+ needsRecovery.put(walog, logNeedsRecovery);
}
+ recoveryNeeded |= logNeedsRecovery;
}
+ }
- if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
- synchronized (this) {
- closeTasksQueued.remove(sortId);
- recoveryDelay.remove(sortId);
- sortsQueued.remove(sortId);
- }
- continue;
- }
+ return recoveryNeeded;
+ }
+ }
- recoveryNeeded = true;
- synchronized (this) {
- if (!closeTasksQueued.contains(sortId) &&
!sortsQueued.contains(sortId)) {
- AccumuloConfiguration aconf = manager.getConfiguration();
- @SuppressWarnings("deprecation")
- LogCloser closer = Property.createInstanceFromPropertyName(aconf,
- aconf.resolve(Property.MANAGER_WAL_CLOSER_IMPLEMENTATION,
- Property.MANAGER_WALOG_CLOSER_IMPLEMETATION),
- LogCloser.class, new HadoopLogCloser());
- Long delay = recoveryDelay.get(sortId);
- if (delay == null) {
- delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
- } else {
- delay = Math.min(2 * delay, 1000 * 60 * 5L);
- }
-
- log.info("Starting recovery of {} (in : {}s), tablet {} holds a
reference", filename,
- (delay / 1000), extent);
-
- ScheduledFuture<?> future = executor.schedule(
- new LogSortTask(closer, filename, dest, sortId), delay,
TimeUnit.MILLISECONDS);
- ThreadPools.watchNonCriticalScheduledTask(future);
- closeTasksQueued.add(sortId);
- recoveryDelay.put(sortId, delay);
- }
+ public RecoverySession newRecoverySession() {
+ return new RecoverySession();
+ }
+
+ private boolean recoverLog(String walog) throws IOException {
+ boolean recoveryNeeded = false;
+
+ Path switchedWalog =
+ VolumeUtil.switchVolume(walog, FileType.WAL,
manager.getContext().getVolumeReplacements());
+ if (switchedWalog != null) {
+ // replaces the volume used for sorting, but do not change entry in
metadata table. When
+ // the tablet loads it will change the metadata table entry. If
+ // the tablet has the same replacement config, then it will find the
sorted log.
+ log.info("Volume replaced {} -> {}", walog, switchedWalog);
+ walog = switchedWalog.toString();
+ }
+
+ String[] parts = walog.split("/");
+ String sortId = parts[parts.length - 1];
+ String filename = new Path(walog).toString();
+ String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString();
+
+ boolean sortQueued;
+ synchronized (this) {
+ sortQueued = sortsQueued.contains(sortId);
+ }
+
+ if (sortQueued
+ && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/"
+ sortId) == null) {
+ synchronized (this) {
+ sortsQueued.remove(sortId);
+ }
+ }
+
+ if (exists(SortedLogState.getFinishedMarkerPath(dest))) {
+ synchronized (this) {
+ closeTasksQueued.remove(sortId);
+ recoveryDelay.remove(sortId);
+ sortsQueued.remove(sortId);
+ }
+ return false;
+ }
+
+ recoveryNeeded = true;
+ synchronized (this) {
+ if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId))
{
+ AccumuloConfiguration aconf = manager.getConfiguration();
+ @SuppressWarnings("deprecation")
+ LogCloser closer = Property.createInstanceFromPropertyName(aconf,
+ aconf.resolve(Property.MANAGER_WAL_CLOSER_IMPLEMENTATION,
+ Property.MANAGER_WALOG_CLOSER_IMPLEMETATION),
+ LogCloser.class, new HadoopLogCloser());
+ Long delay = recoveryDelay.get(sortId);
+ if (delay == null) {
+ delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY);
+ } else {
+ delay = Math.min(2 * delay, 1000 * 60 * 5L);
}
+
+ log.info("Starting recovery of {} (in : {}s)", filename, (delay /
1000));
+
+ ScheduledFuture<?> future = executor.schedule(
+ new LogSortTask(closer, filename, dest, sortId), delay,
TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
+ closeTasksQueued.add(sortId);
+ recoveryDelay.put(sortId, delay);
}
}
+
return recoveryNeeded;
}
}