This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 763367fed3 Don't fully start some servers until upgrade is complete.
(#5378)
763367fed3 is described below
commit 763367fed3a2f86b66a6c810fc707c3e5d8fee9c
Author: Dave Marion <[email protected]>
AuthorDate: Thu Mar 6 13:55:21 2025 -0500
Don't fully start some servers until upgrade is complete. (#5378)
Prevent the CompactionCoordinator, GarbageCollector, and
ScanServer from fully starting until the current version
of software matches the version stored on disk. Backported
AccumuloDataVersion.getCurrentVersion for this change.
Closes #5367
---
.../java/org/apache/accumulo/server/AbstractServer.java | 7 +++++++
.../org/apache/accumulo/server/AccumuloDataVersion.java | 14 ++++++++++++++
.../apache/accumulo/coordinator/CompactionCoordinator.java | 9 +++++++++
.../accumulo/coordinator/CompactionCoordinatorTest.java | 4 ++++
.../org/apache/accumulo/gc/SimpleGarbageCollector.java | 8 ++++++++
.../accumulo/manager/upgrade/UpgradeCoordinator.java | 4 +---
.../main/java/org/apache/accumulo/tserver/ScanServer.java | 13 ++++++++++++-
7 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 69378845e3..1ab7c186c3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -263,4 +263,11 @@ public abstract class AbstractServer
@Override
public void close() {}
+ protected void waitForUpgrade() throws InterruptedException {
+ while (AccumuloDataVersion.getCurrentVersion(getContext()) <
AccumuloDataVersion.get()) {
+ LOG.info("Waiting for upgrade to complete.");
+ Thread.sleep(1000);
+ }
+ }
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
index 666f1136d3..11b3deefd6 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
@@ -78,4 +78,18 @@ public class AccumuloDataVersion {
public static final Set<Integer> CAN_RUN =
Set.of(SHORTEN_RFILE_KEYS, CRYPTO_CHANGES, CURRENT_VERSION);
+
+ /**
+ * Get the stored, current working version.
+ *
+ * @param context the server context
+ * @return the stored data version
+ */
+ public static int getCurrentVersion(ServerContext context) {
+ int cv =
+
context.getServerDirs().getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
+ ServerContext.ensureDataVersionCompatible(cv);
+ return cv;
+ }
+
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 5aad919206..4b49e88415 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -101,6 +101,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Sets;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.micrometer.core.instrument.Tag;
public class CompactionCoordinator extends AbstractServer implements
@@ -265,8 +266,16 @@ public class CompactionCoordinator extends AbstractServer
implements
}
@Override
+ @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call
System.exit")
public void run() {
+ try {
+ waitForUpgrade();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for upgrade to complete,
exiting...");
+ System.exit(1);
+ }
+
ServerAddress coordinatorAddress = null;
try {
coordinatorAddress = startCoordinatorClientService();
diff --git
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 8c42064274..1f62ede587 100644
---
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -224,6 +224,10 @@ public class CompactionCoordinatorTest {
public Collection<Tag> getServiceTags(HostAndPort clientAddr) {
return List.of();
}
+
+ @Override
+ protected void waitForUpgrade() throws InterruptedException {}
+
}
@Test
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 7d7096a860..179f97cce9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -152,6 +152,14 @@ public class SimpleGarbageCollector extends AbstractServer
@Override
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call
System.exit")
public void run() {
+
+ try {
+ waitForUpgrade();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for upgrade to complete,
exiting...");
+ System.exit(1);
+ }
+
final VolumeManager fs = getContext().getVolumeManager();
// Sleep for an initial period, giving the manager time to start up and
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 2b93d4e84f..1f593d16fe 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -160,9 +160,7 @@ public class UpgradeCoordinator {
"Not currently in a suitable state to do zookeeper upgrade %s",
status);
try {
- int cv = context.getServerDirs()
- .getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
- ServerContext.ensureDataVersionCompatible(cv);
+ int cv = AccumuloDataVersion.getCurrentVersion(context);
this.currentVersion = cv;
if (cv == AccumuloDataVersion.get()) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 245302e14f..19aade0113 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -127,6 +127,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
public class ScanServer extends AbstractServer
implements TabletScanClientService.Iface, TabletHostingServer,
ServerProcessService.Iface {
@@ -375,7 +377,16 @@ public class ScanServer extends AbstractServer
}
@Override
+ @SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call
System.exit")
public void run() {
+
+ try {
+ waitForUpgrade();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for upgrade to complete,
exiting...");
+ System.exit(1);
+ }
+
SecurityUtil.serverLogin(getConfiguration());
ServerAddress address = null;
@@ -383,7 +394,7 @@ public class ScanServer extends AbstractServer
address = startScanServerClientService();
clientAddress = address.getAddress();
} catch (UnknownHostException e1) {
- throw new RuntimeException("Failed to start the compactor client
service", e1);
+ throw new RuntimeException("Failed to start the scan server client
service", e1);
}
MetricsInfo metricsInfo = getContext().getMetricsInfo();