This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2e8f5f30ea HDDS-6761. [SCM HA finalization] Handle restarts, crashes,
and leader changes. (#3534)
2e8f5f30ea is described below
commit 2e8f5f30ea070a710ca9ea4caf39a8208216707d
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Jul 12 18:31:40 2022 -0400
HDDS-6761. [SCM HA finalization] Handle restarts, crashes, and leader
changes. (#3534)
---
.../upgrade/AbstractLayoutVersionManager.java | 17 +-
.../ozone/upgrade/BasicUpgradeFinalizer.java | 59 ++--
.../DefaultUpgradeFinalizationExecutor.java | 6 +-
.../InjectedUpgradeFinalizationExecutor.java | 3 -
.../ozone/upgrade/TestBasicUpgradeFinalizer.java | 105 +++++-
.../hadoop/ozone/upgrade/UpgradeTestUtils.java | 59 ++++
.../upgrade/DataNodeUpgradeFinalizer.java | 2 +
.../org/apache/hadoop/hdds/scm/ha/SCMContext.java | 4 +-
.../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java | 8 +-
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 3 +
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 1 +
.../hadoop/hdds/scm/node/NodeStateManager.java | 6 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 29 +-
.../scm/pipeline/BackgroundPipelineCreator.java | 7 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 2 +
.../hdds/scm/pipeline/PipelineManagerImpl.java | 20 +-
.../scm/safemode/HealthyPipelineSafeModeRule.java | 19 +-
.../hdds/scm/safemode/SCMSafeModeManager.java | 2 +-
.../scm/server/upgrade/FinalizationCheckpoint.java | 25 +-
.../scm/server/upgrade/FinalizationManager.java | 16 +
.../server/upgrade/FinalizationManagerImpl.java | 38 +++
.../server/upgrade/FinalizationStateManager.java | 7 +
.../upgrade/FinalizationStateManagerImpl.java | 150 ++++++++-
.../scm/server/upgrade/SCMUpgradeFinalizer.java | 73 ++---
.../hdds/scm/pipeline/MockPipelineManager.java | 5 +
.../hdds/scm/upgrade/TestScmFinalization.java | 52 ++-
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 13 +-
.../hadoop/hdds/upgrade/TestHDDSUpgrade.java | 225 ++-----------
.../hadoop/hdds/upgrade/TestHddsUpgradeUtils.java | 272 ++++++++++++++++
.../hadoop/hdds/upgrade/TestScmHAFinalization.java | 352 +++++++++++++++++++++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 37 +--
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 28 +-
32 files changed, 1272 insertions(+), 373 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 7e043d5b31..e389f50605 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.upgrade;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
@@ -55,7 +54,7 @@ public abstract class AbstractLayoutVersionManager<T extends
LayoutFeature>
protected final TreeMap<Integer, LayoutFeature> features = new TreeMap<>();
@VisibleForTesting
protected final Map<String, LayoutFeature> featureMap = new HashMap<>();
- private volatile Status currentUpgradeState = FINALIZATION_REQUIRED;
+ private volatile Status currentUpgradeState;
// Allows querying upgrade state while an upgrade is in progress.
// Note that MLV may have been incremented during the upgrade
// by the time the value is read/used.
@@ -75,6 +74,8 @@ public abstract class AbstractLayoutVersionManager<T extends
LayoutFeature>
metadataLayoutVersion, softwareLayoutVersion));
} else if (metadataLayoutVersion == softwareLayoutVersion) {
currentUpgradeState = ALREADY_FINALIZED;
+ } else {
+ currentUpgradeState = FINALIZATION_REQUIRED;
}
LayoutFeature mlvFeature = features.get(metadataLayoutVersion);
@@ -125,7 +126,7 @@ public abstract class AbstractLayoutVersionManager<T
extends LayoutFeature>
metadataLayoutVersion = layoutFeature.layoutVersion();
LOG.info("Layout feature {} has been finalized.", layoutFeature);
if (!needsFinalization()) {
- completeFinalization();
+ LOG.info("Finalization is complete.");
}
} else {
String msgStart = "";
@@ -148,16 +149,6 @@ public abstract class AbstractLayoutVersionManager<T
extends LayoutFeature>
}
}
- private void completeFinalization() {
- lock.writeLock().lock();
- try {
- currentUpgradeState = FINALIZATION_DONE;
- LOG.info("Finalization is complete.");
- } finally {
- lock.writeLock().unlock();
- }
- }
-
private boolean softwareIsBehindMetaData() {
lock.readLock().lock();
try {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
index fa4af7a231..400fffc3fa 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -26,6 +26,7 @@ import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOU
import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_ACTION_VALIDATION_FAILED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
@@ -46,9 +47,10 @@ import
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType;
import org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
/**
- * UpgradeFinalizer implementation for the Storage Container Manager service.
+ * Base UpgradeFinalizer implementation to be extended by services.
*/
public abstract class BasicUpgradeFinalizer
<T, V extends AbstractLayoutVersionManager> implements UpgradeFinalizer<T>
{
@@ -82,29 +84,37 @@ public abstract class BasicUpgradeFinalizer
// sets the finalization status to FINALIZATION_IN_PROGRESS.
// Therefore, a lock is used to make sure only one finalization thread is
// running at a time.
- StatusAndMessages response = initFinalize(upgradeClientID, service);
+ if (isFinalized(versionManager.getUpgradeState())) {
+ return FINALIZED_MSG;
+ }
if (finalizationLock.tryLock()) {
try {
- // Even if the status indicates finalization completed, the component
- // may not have finished all its specific steps if finalization was
- // interrupted, so we should re-invoke them here.
+ StatusAndMessages response = initFinalize(upgradeClientID, service);
+ // If we were able to enter the lock and finalization status is "in
+ // progress", we should resume finalization because the last attempt
+ // was interrupted. If an attempt was currently ongoing, the lock
+ // would have been held.
if (response.status() == FINALIZATION_REQUIRED ||
- !componentFinishedFinalizationSteps(service)) {
+ response.status() == FINALIZATION_IN_PROGRESS) {
finalizationExecutor.execute(service, this);
- response = STARTING_MSG;
+ return STARTING_MSG;
}
+ // Else, the initial response we got from initFinalize can be used,
+ // since we do not need to start/resume finalization.
+ return response;
+ } catch (NotLeaderException e) {
+ LOG.info("Leader change encountered during finalization. This " +
+ "component will continue finalization as directed by the new " +
+ "leader.", e);
+ return FINALIZATION_IN_PROGRESS_MSG;
} finally {
finalizationLock.unlock();
}
} else {
- // We could not acquire the lock, so either finalization is ongoing, or
- // it already finished but we received multiple requests to
- // run it at the same time.
- if (!isFinalized(response.status())) {
- response = FINALIZATION_IN_PROGRESS_MSG;
- }
+ // Finalization has not completed, but another thread holds the lock to
+ // run finalization.
+ return FINALIZATION_IN_PROGRESS_MSG;
}
- return response;
}
public synchronized StatusAndMessages reportStatus(
@@ -126,12 +136,20 @@ public abstract class BasicUpgradeFinalizer
return versionManager.getUpgradeState();
}
+ /**
+ * Child classes may override this method to set when finalization has
+ * begun progress.
+ */
protected void preFinalizeUpgrade(T service) throws IOException {
- // No Op by default.
+ versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
}
+ /**
+ * Child classes may override this method to delay finalization being
+ * marked done until a set of post finalize actions complete.
+ */
protected void postFinalizeUpgrade(T service) throws IOException {
- // No Op by default.
+ versionManager.setUpgradeState(FINALIZATION_DONE);
}
@Override
@@ -226,15 +244,6 @@ public abstract class BasicUpgradeFinalizer
|| status.equals(FINALIZATION_DONE);
}
- /**
- * Child classes that have additional finalization steps can override this
- * method to check component specific state to determine whether
- * finalization still needs to be run.
- */
- protected boolean componentFinishedFinalizationSteps(T service) {
- return true;
- }
-
public abstract void finalizeLayoutFeature(LayoutFeature lf, T context)
throws UpgradeException;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
index 1e85581d9b..6746536d7c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.upgrade;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
@@ -43,8 +42,6 @@ public class DefaultUpgradeFinalizationExecutor<T>
throws IOException {
try {
finalizer.emitStartingMsg();
- finalizer.getVersionManager()
- .setUpgradeState(FINALIZATION_IN_PROGRESS);
finalizer.preFinalizeUpgrade(component);
@@ -59,9 +56,10 @@ public class DefaultUpgradeFinalizationExecutor<T>
if (finalizer.getVersionManager().needsFinalization()) {
finalizer.getVersionManager()
.setUpgradeState(FINALIZATION_REQUIRED);
- throw (e);
+ throw e;
}
} finally {
+ // Used for testing.
finalizer.markFinalizationDone();
}
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
index bf2f35664a..23fff3576c 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
@@ -22,7 +22,6 @@ import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecuto
import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
@@ -73,8 +72,6 @@ public class InjectedUpgradeFinalizationExecutor<T> extends
try {
injectTestFunctionAtThisPoint(BEFORE_PRE_FINALIZE_UPGRADE);
finalizer.emitStartingMsg();
- finalizer.getVersionManager()
- .setUpgradeState(FINALIZATION_IN_PROGRESS);
finalizer.preFinalizeUpgrade(component);
injectTestFunctionAtThisPoint(AFTER_PRE_FINALIZE_UPGRADE);
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
index 5b83f3552e..96344f3984 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestBasicUpgradeFinalizer.java
@@ -22,9 +22,12 @@ import static
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLa
import static
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLayoutFeature.VERSION_3;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+
+import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.inOrder;
@@ -32,6 +35,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.hadoop.ozone.common.Storage;
import
org.apache.hadoop.ozone.upgrade.TestUpgradeFinalizerActions.MockLayoutVersionManager;
@@ -39,12 +47,17 @@ import
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test for BasicUpgradeFinalizer.
*/
public class TestBasicUpgradeFinalizer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBasicUpgradeFinalizer.class);
+
@Test
public void testFinalizerPhasesAreInvokedInOrder() throws IOException {
SimpleTestFinalizer finalizer = spy(SimpleTestFinalizer.class);
@@ -91,6 +104,82 @@ public class TestBasicUpgradeFinalizer {
finalizer.postCalled);
}
+
+ /**
+ * Tests that the upgrade finalizer gives expected statuses when multiple
+ * clients invoke finalize and query finalize status simultaneously.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentFinalization() throws Exception {
+ CountDownLatch pauseLatch = new CountDownLatch(1);
+ CountDownLatch unpauseLatch = new CountDownLatch(1);
+ // Pause finalization to test concurrent finalize requests. The injection
+ // point to pause at does not matter.
+ InjectedUpgradeFinalizationExecutor<Object> executor =
+ UpgradeTestUtils.newPausingFinalizationExecutor(
+ UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE,
+ pauseLatch, unpauseLatch, LOG);
+ SimpleTestFinalizer finalizer =
+ new SimpleTestFinalizer(
+ new MockLayoutVersionManager(VERSION_1.layoutVersion()), executor);
+
+ // The first finalize call should block until the executor is unpaused.
+ Future<?> firstFinalizeFuture = runFinalization(finalizer,
+ UpgradeFinalizer.Status.STARTING_FINALIZATION);
+ // Wait for finalization to pause at the halting point.
+ pauseLatch.await();
+
+ Future<?> secondFinalizeFuture = runFinalization(finalizer,
+ UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS);
+ Future<?> finalizeQueryFuture = runFinalizationQuery(finalizer,
+ UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS);
+
+ // While finalization is paused, the two following requests should have
+ // reported it is in progress.
+ secondFinalizeFuture.get();
+ finalizeQueryFuture.get();
+
+ // Now resume finalization so the initial finalize request can complete.
+ unpauseLatch.countDown();
+ firstFinalizeFuture.get();
+
+ // All subsequent queries should return finalization done, even if they
+ // land in parallel.
+ List<Future<?>> finalizeFutures = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ finalizeFutures.add(runFinalizationQuery(finalizer,
+ UpgradeFinalizer.Status.FINALIZATION_DONE));
+ }
+
+ // Wait for all queries to complete.
+ for (Future<?> finalizeFuture: finalizeFutures) {
+ finalizeFuture.get();
+ }
+ }
+
+ private Future<?> runFinalization(
+ BasicUpgradeFinalizer<Object, MockLayoutVersionManager> finalizer,
+ UpgradeFinalizer.Status expectedStatus) {
+ return Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ StatusAndMessages result = finalizer.finalize("test", new Object());
+ assertEquals(expectedStatus, result.status());
+ } catch (Exception ex) {
+ LOG.error("Finalization failed", ex);
+ fail("Finalization failed with exception: " +
+ ex.getMessage());
+ }
+ });
+ }
+
+ private Future<?> runFinalizationQuery(UpgradeFinalizer<Object> finalizer,
+ UpgradeFinalizer.Status expectedStatus) {
+ return Executors.newSingleThreadExecutor().submit(() -> {
+ assertEquals(expectedStatus, finalizer.getStatus());
+ });
+ }
+
/**
* Yet another mock finalizer.
*/
@@ -101,6 +190,9 @@ public class TestBasicUpgradeFinalizer {
private boolean finalizeCalled = false;
private boolean postCalled = false;
+ /**
+ * Invoked by Mockito.
+ */
SimpleTestFinalizer() throws IOException {
super(new MockLayoutVersionManager(VERSION_1.layoutVersion()));
}
@@ -109,13 +201,20 @@ public class TestBasicUpgradeFinalizer {
super(lvm);
}
+ SimpleTestFinalizer(MockLayoutVersionManager lvm,
+ UpgradeFinalizationExecutor<Object> executor) {
+ super(lvm, executor);
+ }
+
@Override
- protected void preFinalizeUpgrade(Object service) {
+ protected void preFinalizeUpgrade(Object service) throws IOException {
+ super.preFinalizeUpgrade(service);
preCalled = true;
}
@Override
- protected void postFinalizeUpgrade(Object service) {
+ protected void postFinalizeUpgrade(Object service) throws IOException {
+ super.postFinalizeUpgrade(service);
postCalled = true;
}
@@ -141,7 +240,7 @@ public class TestBasicUpgradeFinalizer {
@Override
public void runPrefinalizeStateActions(Storage storage, Object service) {
-
+ // no-op for testing.
}
}
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
index d3990e6098..d6ea2ec8b8 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/UpgradeTestUtils.java
@@ -18,12 +18,15 @@
*/
package org.apache.hadoop.ozone.upgrade;
+import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.common.StorageInfo;
+import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
/**
* Upgrade related test utility methods.
@@ -51,4 +54,60 @@ public final class UpgradeTestUtils {
return versionFile;
}
+
+ /**
+ * @param haltingPoint Where to halt finalization in the returned
+ * executor's {@code execute} method.
+ * @param pauseLatch The latch that will be counted down 1 by the
+ * executor when the upgrade finalization has been paused.
+ * @param unpauseLatch The latch that the caller should count down to
+ * resume upgrade finalization.
+ * @param log Where to log messages about pausing and resuming finalization.
+ * @return A new InjectedUpgradeFinalizationExecutor
+ */
+ public static <T> InjectedUpgradeFinalizationExecutor<T>
+ newPausingFinalizationExecutor(UpgradeTestInjectionPoints haltingPoint,
+ CountDownLatch pauseLatch, CountDownLatch unpauseLatch, Logger log) {
+ InjectedUpgradeFinalizationExecutor<T>
+ executor = new InjectedUpgradeFinalizationExecutor<>();
+ executor.configureTestInjectionFunction(haltingPoint, () -> {
+ log.info("Halting upgrade finalization at point: {}", haltingPoint);
+ try {
+ pauseLatch.countDown();
+ unpauseLatch.await();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("SCM test finalization interrupted.", ex);
+ }
+ log.info("Upgrade finalization resumed from point: {}", haltingPoint);
+ return false;
+ });
+
+ return executor;
+ }
+
+ /**
+ * @param haltingPoint Where to halt finalization in the returned
+ * executor's {@code execute} method.
+ * @param terminateLatch The latch that will be counted down 1 by the
+ * executor when the upgrade finalization has been terminated.
+ * @param log Where to log messages about pausing and resuming finalization.
+ * @return A new InjectedUpgradeFinalizationExecutor
+ */
+ public static <T> InjectedUpgradeFinalizationExecutor<T>
+ newTerminatingFinalizationExecutor(
+ UpgradeTestInjectionPoints haltingPoint,
+ CountDownLatch terminateLatch, Logger log) {
+ InjectedUpgradeFinalizationExecutor<T>
+ executor =
+ new InjectedUpgradeFinalizationExecutor<>();
+ executor.configureTestInjectionFunction(haltingPoint, () -> {
+ log.info("Terminating upgrade finalization at point: {}. This is " +
+ "expected test execution.", haltingPoint);
+ terminateLatch.countDown();
+ return true;
+ });
+
+ return executor;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index 5b6272c2ab..bb4c5075b5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.upgrade;
import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_VALIDATION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
@@ -55,6 +56,7 @@ public class DataNodeUpgradeFinalizer extends
logAndEmit(msg);
throw new UpgradeException(msg, PREFINALIZE_VALIDATION_FAILED);
}
+ getVersionManager().setUpgradeState(FINALIZATION_IN_PROGRESS);
}
private boolean canFinalizeDataNode(DatanodeStateMachine dsm) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
index dc1ca87ee0..de15514839 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
@@ -250,10 +250,10 @@ public final class SCMContext {
}
}
- public boolean isFinalizationCheckpointCrossed(FinalizationCheckpoint query)
{
+ public FinalizationCheckpoint getFinalizationCheckpoint() {
lock.readLock().lock();
try {
- return this.finalizationCheckpoint.hasCrossed(query);
+ return this.finalizationCheckpoint;
} finally {
lock.readLock().unlock();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
index bb12df6ec0..71cdd92774 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -60,12 +60,16 @@ public class SCMHAInvocationHandler implements
InvocationHandler {
@Override
public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
+ // Javadoc for InvocationHandler#invoke specifies that args will be null
+ // if the method takes no arguments. Convert this to an empty array for
+ // easier handling.
+ Object[] convertedArgs = (args == null) ? new Object[]{} : args;
try {
long startTime = Time.monotonicNow();
final Object result =
ratisHandler != null && method.isAnnotationPresent(Replicate.class) ?
- invokeRatis(method, args) :
- invokeLocal(method, args);
+ invokeRatis(method, convertedArgs) :
+ invokeLocal(method, convertedArgs);
LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() -
startTime);
return result;
} catch (InvocationTargetException iEx) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 16b32735aa..934c25e980 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -383,6 +383,9 @@ public class SCMHAManagerImpl implements SCMHAManager {
}
scm.getScmCertificateServer().reinitialize(metadataStore);
}
+ // This call also performs upgrade finalization if the new table contains a
+ // higher metadata layout version than the SCM's current one.
+ scm.getFinalizationManager().reinitialize(metadataStore.getMetaTable());
}
@VisibleForTesting
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 74074bc454..145a40a665 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -328,6 +328,7 @@ public class SCMStateMachine extends BaseStateMachine {
.isLeaderReady()) {
scm.getScmContext().setLeaderReady();
scm.getSCMServiceManager().notifyStatusChanged();
+ scm.getFinalizationManager().onLeaderReady();
}
// Means all transactions before this term have been applied.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index da4337f68a..b8a83e20b9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -46,7 +46,7 @@ import
org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.node.states.NodeStateMap;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -216,8 +216,8 @@ public class NodeStateManager implements Runnable,
Closeable {
// to healthy readonly until SCM finishes updating its MLV, hence the
// checkpoint check here.
layoutMisMatchCondition = (layout) ->
- scmContext.isFinalizationCheckpointCrossed(
- FinalizationCheckpoint.MLV_EQUALS_SLV) &&
+ FinalizationManager.shouldTellDatanodesToFinalize(
+ scmContext.getFinalizationCheckpoint()) &&
!layoutMatchCondition.test(layout);
scheduleNextHealthCheck();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 78a1b632f2..62ef30d27c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
-import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.ipc.Server;
@@ -655,8 +655,8 @@ public class SCMNodeManager implements NodeManager {
datanodeDetails.getHostName(), dnSlv, scmSlv);
}
- if (scmContext.isFinalizationCheckpointCrossed(
- FinalizationCheckpoint.MLV_EQUALS_SLV)) {
+ if (FinalizationManager.shouldTellDatanodesToFinalize(
+ scmContext.getFinalizationCheckpoint())) {
// Because we have crossed the MLV_EQUALS_SLV checkpoint, SCM metadata
// layout version will not change. We can now compare it to the
// datanodes' metadata layout versions to tell them to finalize.
@@ -675,16 +675,19 @@ public class SCMNodeManager implements NodeManager {
LayoutVersionProto.newBuilder()
.setSoftwareLayoutVersion(dnSlv)
.setMetadataLayoutVersion(dnSlv).build());
- try {
- finalizeCmd.setTerm(scmContext.getTermOfLeader());
-
- // Send Finalize command to the data node. Its OK to
- // send Finalize command multiple times.
- scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(datanodeDetails.getUuid(),
finalizeCmd));
- } catch (NotLeaderException ex) {
- LOG.warn("Skip sending finalize upgrade command since current SCM" +
- "is not leader.", ex);
+ if (scmContext.isLeader()) {
+ try {
+ finalizeCmd.setTerm(scmContext.getTermOfLeader());
+
+ // Send Finalize command to the data node. Its OK to
+ // send Finalize command multiple times.
+ scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(datanodeDetails.getUuid(),
+ finalizeCmd));
+ } catch (NotLeaderException ex) {
+ LOG.warn("Skip sending finalize upgrade command since current SCM"
+
+ " is not leader.", ex);
+ }
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index cf3ab0baf5..d8a1a9403f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -109,9 +109,6 @@ public class BackgroundPipelineCreator implements
SCMService {
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
-
- // start RatisPipelineUtilsThread
- start();
}
/**
@@ -162,6 +159,10 @@ public class BackgroundPipelineCreator implements
SCMService {
}
}
+ public boolean isRunning() {
+ return running.get();
+ }
+
private void run() {
while (running.get()) {
if (shouldRun()) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 09816b6b92..b1c3032504 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -191,6 +191,8 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean {
*/
void resumePipelineCreation();
+ boolean isPipelineCreationFrozen();
+
/**
* Acquire read lock.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index 3e5d86d0af..aaa3088f9a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -155,6 +156,16 @@ public class PipelineManagerImpl implements
PipelineManager {
BackgroundPipelineCreator backgroundPipelineCreator =
new BackgroundPipelineCreator(pipelineManager, conf, scmContext,
clock);
+ pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+ serviceManager.register(backgroundPipelineCreator);
+
+ if (FinalizationManager.shouldCreateNewPipelines(
+ scmContext.getFinalizationCheckpoint())) {
+ pipelineManager.resumePipelineCreation();
+ } else {
+ pipelineManager.freezePipelineCreation();
+ }
+
final long scrubberIntervalInMillis = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL,
ScmConfigKeys.OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT,
@@ -178,10 +189,7 @@ public class PipelineManagerImpl implements
PipelineManager {
}
}).build();
- pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
pipelineManager.setBackgroundPipelineScrubber(backgroundPipelineScrubber);
-
- serviceManager.register(backgroundPipelineCreator);
serviceManager.register(backgroundPipelineScrubber);
return pipelineManager;
@@ -686,6 +694,12 @@ public class PipelineManagerImpl implements
PipelineManager {
backgroundPipelineCreator.start();
}
+ @Override
+ public boolean isPipelineCreationFrozen() {
+ return freezePipelineCreation.get() &&
+ !backgroundPipelineCreator.isRunning();
+ }
+
@Override
public void close() throws IOException {
if (backgroundPipelineCreator != null) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 3e735b454b..02e5fdc430 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -25,9 +25,11 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -53,12 +55,14 @@ public class HealthyPipelineSafeModeRule extends
SafeModeExitRule<Pipeline> {
private final Set<PipelineID> processedPipelineIDs = new HashSet<>();
private final PipelineManager pipelineManager;
private final int minHealthyPipelines;
+ private final SCMContext scmContext;
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
- PipelineManager pipelineManager,
- SCMSafeModeManager manager, ConfigurationSource configuration) {
+ PipelineManager pipelineManager, SCMSafeModeManager manager,
+ ConfigurationSource configuration, SCMContext scmContext) {
super(manager, ruleName, eventQueue);
this.pipelineManager = pipelineManager;
+ this.scmContext = scmContext;
healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys.
HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
@@ -101,7 +105,16 @@ public class HealthyPipelineSafeModeRule extends
SafeModeExitRule<Pipeline> {
@Override
protected synchronized boolean validate() {
- return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
+ boolean shouldRunSafemodeCheck =
+ FinalizationManager.shouldCreateNewPipelines(
+ scmContext.getFinalizationCheckpoint());
+ if (!shouldRunSafemodeCheck) {
+ LOG.info("All SCM pipelines are closed due to ongoing upgrade " +
+ "finalization. Bypassing healthy pipeline safemode rule.");
+ return true;
+ } else {
+ return currentHealthyPipelineCount >= healthyPipelineThresholdCount;
+ }
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a13a2d73b7..fe0c7bef77 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -140,7 +140,7 @@ public class SCMSafeModeManager implements SafeModeManager {
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
new HealthyPipelineSafeModeRule(HEALTHY_PIPELINE_EXIT_RULE,
eventQueue, pipelineManager,
- this, config);
+ this, config, scmContext);
OneReplicaPipelineSafeModeRule oneReplicaPipelineSafeModeRule =
new OneReplicaPipelineSafeModeRule(
ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, eventQueue,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
index 1db1323f24..84e662bf4e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationCheckpoint.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.scm.server.upgrade;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+
/**
* A finalization checkpoint is an abstraction over SCM's disk state,
* indicating where finalization left off so it can be resumed on leader
@@ -28,18 +30,27 @@ package org.apache.hadoop.hdds.scm.server.upgrade;
* layout version.
*/
public enum FinalizationCheckpoint {
- FINALIZATION_REQUIRED(false, true),
- FINALIZATION_STARTED(true, true),
- MLV_EQUALS_SLV(true, false),
- FINALIZATION_COMPLETE(false, false);
+ FINALIZATION_REQUIRED(false, true,
+ UpgradeFinalizer.Status.FINALIZATION_REQUIRED),
+ FINALIZATION_STARTED(true, true,
+ UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS),
+ MLV_EQUALS_SLV(true, false,
+ UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS),
+ FINALIZATION_COMPLETE(false, false,
+ UpgradeFinalizer.Status.FINALIZATION_DONE);
private final boolean needsFinalizingMark;
private final boolean needsMlvBehindSlv;
+ // The upgrade status that should be reported back to the client when this
+ // checkpoint is crossed.
+ private final UpgradeFinalizer.Status status;
FinalizationCheckpoint(boolean needsFinalizingMark,
- boolean needsMlvBehindSlv) {
+ boolean needsMlvBehindSlv,
+ UpgradeFinalizer.Status status) {
this.needsFinalizingMark = needsFinalizingMark;
this.needsMlvBehindSlv = needsMlvBehindSlv;
+ this.status = status;
}
/**
@@ -70,4 +81,8 @@ public enum FinalizationCheckpoint {
public boolean hasCrossed(FinalizationCheckpoint query) {
return this.compareTo(query) >= 0;
}
+
+ public UpgradeFinalizer.Status getStatus() {
+ return status;
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
index 7456c8ed46..49fc914fb3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
@@ -51,4 +52,19 @@ public interface FinalizationManager {
void buildUpgradeContext(NodeManager nodeManager,
PipelineManager pipelineManager,
SCMContext scmContext);
+
+ void reinitialize(Table<String, String> finalizationStore) throws
IOException;
+
+ void onLeaderReady();
+
+ static boolean shouldCreateNewPipelines(FinalizationCheckpoint checkpoint) {
+ return !checkpoint.hasCrossed(FinalizationCheckpoint.FINALIZATION_STARTED)
+ || checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+ }
+
+ static boolean shouldTellDatanodesToFinalize(
+ FinalizationCheckpoint checkpoint) {
+ return checkpoint.hasCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
index 4d759469d3..0a6c347658 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationManagerImpl.java
@@ -32,14 +32,21 @@ import
org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.Executors;
/**
* Class to initiate SCM finalization and query its progress.
*/
public class FinalizationManagerImpl implements FinalizationManager {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(FinalizationManagerImpl.class);
+
private SCMUpgradeFinalizer upgradeFinalizer;
private SCMUpgradeFinalizationContext context;
private SCMStorageConfig storage;
@@ -134,6 +141,37 @@ public class FinalizationManagerImpl implements
FinalizationManager {
return finalizationStateManager.getFinalizationCheckpoint();
}
+ @Override
+ public void reinitialize(Table<String, String> finalizationStore)
+ throws IOException {
+ finalizationStateManager.reinitialize(finalizationStore);
+ }
+
+ @Override
+ public void onLeaderReady() {
+ // Launch a background thread to drive finalization.
+ Executors.newSingleThreadExecutor().submit(() -> {
+ FinalizationCheckpoint currentCheckpoint = getCheckpoint();
+ if (currentCheckpoint.hasCrossed(
+ FinalizationCheckpoint.FINALIZATION_STARTED) &&
+ !currentCheckpoint.hasCrossed(
+ FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
+ LOG.info("SCM became leader. Resuming upgrade finalization from" +
+ " current checkpoint {}.", currentCheckpoint);
+ try {
+ finalizeUpgrade("resume-finalization-as-leader");
+ } catch (IOException ex) {
+ ExitUtils.terminate(1,
+ "Resuming upgrade finalization failed on SCM leader change.",
+ ex, true, LOG);
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("SCM became leader. No upgrade finalization action" +
+ " required for current checkpoint {}", currentCheckpoint);
+ }
+ });
+ }
+
/**
* Builds a {@link FinalizationManagerImpl}.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
index 44868715f8..fe1a8e91dd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManager.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.server.upgrade;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.utils.db.Table;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@@ -46,4 +47,10 @@ public interface FinalizationStateManager {
FinalizationCheckpoint getFinalizationCheckpoint();
void setUpgradeContext(SCMUpgradeFinalizationContext context);
+
+ /**
+ * Called on snapshot installation.
+ */
+ void reinitialize(Table<String, String> newFinalizationStore)
+ throws IOException;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
index 786612628a..958343cb40 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/FinalizationStateManagerImpl.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hdds.scm.server.upgrade;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -41,10 +44,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class FinalizationStateManagerImpl implements FinalizationStateManager {
- private static final Logger LOG =
+ @VisibleForTesting
+ public static final Logger LOG =
LoggerFactory.getLogger(FinalizationStateManagerImpl.class);
- private final Table<String, String> finalizationStore;
+ private Table<String, String> finalizationStore;
private final DBTransactionBuffer transactionBuffer;
private final HDDSLayoutVersionManager versionManager;
// Ensures that we are not in the process of updating checkpoint state as
@@ -62,13 +66,51 @@ public class FinalizationStateManagerImpl implements
FinalizationStateManager {
this.upgradeFinalizer = builder.upgradeFinalizer;
this.versionManager = this.upgradeFinalizer.getVersionManager();
this.checkpointLock = new ReentrantReadWriteLock();
+ initialize();
+ }
+
+ private void initialize() throws IOException {
this.hasFinalizingMark =
finalizationStore.isExist(OzoneConsts.FINALIZING_KEY);
}
+ private void publishCheckpoint(FinalizationCheckpoint checkpoint) {
+ // Move the upgrade status according to this checkpoint. This is sent
+ // back to the client if they query for the current upgrade status.
+ versionManager.setUpgradeState(checkpoint.getStatus());
+
+ // Check whether this checkpoint change requires us to move node state.
+ // If this is necessary, it must be done before unfreezing pipeline
+ // creation to make sure nodes are not added to pipelines based on
+ // outdated layout information.
+ // This operation is not idempotent.
+ if (checkpoint == FinalizationCheckpoint.MLV_EQUALS_SLV) {
+ upgradeContext.getNodeManager().forceNodesToHealthyReadOnly();
+ }
+
+ // Check whether this checkpoint change requires us to freeze pipeline
+ // creation. These are idempotent operations.
+ PipelineManager pipelineManager = upgradeContext.getPipelineManager();
+ if (FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
+ pipelineManager.isPipelineCreationFrozen()) {
+ pipelineManager.resumePipelineCreation();
+ } else if (!FinalizationManager.shouldCreateNewPipelines(checkpoint) &&
+ !pipelineManager.isPipelineCreationFrozen()) {
+ pipelineManager.freezePipelineCreation();
+ }
+
+ // Set the checkpoint in the SCM context so other components can read it.
+ upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
+ }
+
@Override
public void setUpgradeContext(SCMUpgradeFinalizationContext context) {
this.upgradeContext = context;
+ FinalizationCheckpoint checkpoint = getFinalizationCheckpoint();
+ upgradeContext.getSCMContext().setFinalizationCheckpoint(checkpoint);
+ // Set the version manager's upgrade status (sent back to the client to
+ // identify upgrade progress) based on the current checkpoint.
+ versionManager.setUpgradeState(checkpoint.getStatus());
}
@Override
@@ -79,14 +121,22 @@ public class FinalizationStateManagerImpl implements
FinalizationStateManager {
} finally {
checkpointLock.writeLock().unlock();
}
- upgradeContext.getSCMContext().setFinalizationCheckpoint(
- FinalizationCheckpoint.FINALIZATION_STARTED);
transactionBuffer.addToBuffer(finalizationStore,
OzoneConsts.FINALIZING_KEY, "");
+ publishCheckpoint(FinalizationCheckpoint.FINALIZATION_STARTED);
}
@Override
public void finalizeLayoutFeature(Integer layoutVersion) throws IOException {
+ finalizeLayoutFeatureLocal(layoutVersion);
+ }
+
+ /**
+ * A version of finalizeLayoutFeature without the {@link Replicate}
+ * annotation that can be called by followers to finalize from a snapshot.
+ */
+ private void finalizeLayoutFeatureLocal(Integer layoutVersion)
+ throws IOException {
checkpointLock.writeLock().lock();
try {
// The VERSION file is the source of truth for the current layout
@@ -101,8 +151,7 @@ public class FinalizationStateManagerImpl implements
FinalizationStateManager {
}
if (!versionManager.needsFinalization()) {
- upgradeContext.getSCMContext().setFinalizationCheckpoint(
- FinalizationCheckpoint.MLV_EQUALS_SLV);
+ publishCheckpoint(FinalizationCheckpoint.MLV_EQUALS_SLV);
}
transactionBuffer.addToBuffer(finalizationStore,
OzoneConsts.LAYOUT_VERSION_KEY, String.valueOf(layoutVersion));
@@ -132,8 +181,7 @@ public class FinalizationStateManagerImpl implements
FinalizationStateManager {
ExitUtils.terminate(1, errorMessage, LOG);
}
- upgradeContext.getSCMContext().setFinalizationCheckpoint(
- FinalizationCheckpoint.FINALIZATION_COMPLETE);
+ publishCheckpoint(FinalizationCheckpoint.FINALIZATION_COMPLETE);
}
@Override
@@ -164,14 +212,90 @@ public class FinalizationStateManagerImpl implements
FinalizationStateManager {
}
}
- String errorMessage = String.format("SCM upgrade finalization " +
- "is in an unknown state.%nFinalizing mark present? %b%n" +
- "Metadata layout version behind software layout version? %b",
- hasFinalizingMarkSnapshot, mlvBehindSlvSnapshot);
- Preconditions.checkNotNull(currentCheckpoint, errorMessage);
+ // SCM cannot function if it does not know which finalization checkpoint
+ // it is on, so it must terminate. This should only happen in the case of
+ // a serious bug.
+ if (currentCheckpoint == null) {
+ String errorMessage = String.format("SCM upgrade finalization " +
+ "is in an unknown state.%nFinalizing mark present? %b%n" +
+ "Metadata layout version behind software layout version? %b",
+ hasFinalizingMarkSnapshot, mlvBehindSlvSnapshot);
+ ExitUtils.terminate(1, errorMessage, LOG);
+ }
+
return currentCheckpoint;
}
+ /**
+ * Called on snapshot installation.
+ */
+ @Override
+ public void reinitialize(Table<String, String> newFinalizationStore)
+ throws IOException {
+ checkpointLock.writeLock().lock();
+ try {
+ this.finalizationStore.close();
+ this.finalizationStore = newFinalizationStore;
+ initialize();
+
+ int dbLayoutVersion = getDBLayoutVersion();
+ int currentLayoutVersion = versionManager.getMetadataLayoutVersion();
+ if (currentLayoutVersion < dbLayoutVersion) {
+ // Snapshot contained a higher metadata layout version. Finalize this
+ // follower SCM as a result.
+ LOG.info("New SCM snapshot received with metadata layout version {}, "
+
+ "which is higher than this SCM's metadata layout version {}." +
+ "Attempting to finalize current SCM to that version.",
+ dbLayoutVersion, currentLayoutVersion);
+ // Since the SCM is finalizing from a snapshot, it is a follower, and
+ // does not need to run the leader only finalization driving actions
+ // that the UpgradeFinalizationExecutor contains. Just run the
+ // upgrade actions for the layout features, set the finalization
+ // checkpoint, and increase the version in the VERSION file.
+ for (int version = currentLayoutVersion + 1; version <=
dbLayoutVersion;
+ version++) {
+ finalizeLayoutFeatureLocal(version);
+ }
+ }
+ publishCheckpoint(getFinalizationCheckpoint());
+ } catch (Exception ex) {
+ LOG.error("Failed to reinitialize finalization state", ex);
+ throw new IOException(ex);
+ } finally {
+ checkpointLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the metadata layout version from the SCM RocksDB. This is used for
+ * Ratis snapshot based finalization in a slow follower. In all other
+ * cases, the VERSION file should be the source of truth.
+ *
+ * MLV was not stored in RocksDB until SCM HA supported snapshot based
+ * finalization, which was after a few HDDS layout features
+ * were introduced. If the SCM has not finalized since this code
+ * was added, the layout version will not be there. Defer to the MLV in the
+ * VERSION file in this case, since finalization is not ongoing. The key will
+ * be added once finalization is started with this software version.
+ */
+ private int getDBLayoutVersion() throws IOException {
+ String dbLayoutVersion = finalizationStore.get(
+ OzoneConsts.LAYOUT_VERSION_KEY);
+ if (dbLayoutVersion == null) {
+ return versionManager.getMetadataLayoutVersion();
+ } else {
+ try {
+ return Integer.parseInt(dbLayoutVersion);
+ } catch (NumberFormatException ex) {
+ String msg = String.format(
+ "Failed to read layout version from SCM DB. Found string %s",
+ dbLayoutVersion);
+ LOG.error(msg, ex);
+ throw new IOException(msg, ex);
+ }
+ }
+ }
+
/**
* Builds a {@link FinalizationManagerImpl}.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index fa3520ee38..b65cc00706 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature;
import org.apache.hadoop.ozone.upgrade.UpgradeException;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
/**
* UpgradeFinalizer for the Storage Container Manager service.
@@ -59,19 +61,6 @@ public class SCMUpgradeFinalizer extends
LOG.info("SCM Finalization has crossed checkpoint {}", checkpoint);
}
- @Override
- protected boolean componentFinishedFinalizationSteps(
- SCMUpgradeFinalizationContext context) {
- // By default, the parent class will mark finalization as complete when
- // MLV == SLV. However, for SCM, there are a few extra steps that need to
- // be done after this point (see postFinalizeUpgrade). If there is a
- // leader change or restart after MLV == SLV but before running these
- // post finalize steps, we must tell the parent to run finalization
- // even though MLV == SLV.
- return context.getFinalizationStateManager()
- .crossedCheckpoint(FinalizationCheckpoint.FINALIZATION_COMPLETE);
- }
-
@Override
public void preFinalizeUpgrade(SCMUpgradeFinalizationContext context)
throws IOException {
@@ -123,33 +112,24 @@ public class SCMUpgradeFinalizer extends
super.finalizeLayoutFeature(lf,
lf.scmAction(LayoutFeature.UpgradeActionType.ON_FINALIZE),
context.getStorage());
-
- if (!getVersionManager().needsFinalization()) {
- // If we just finalized the last layout feature, don 't wait for next
- // heartbeat from datanodes in order to move them to
- // Healthy - Readonly state. Force them to Healthy ReadOnly state so that
- // we can resume pipeline creation right away.
- context.getNodeManager().forceNodesToHealthyReadOnly();
- }
}
public void postFinalizeUpgrade(SCMUpgradeFinalizationContext context)
throws IOException {
- try {
- // If we reached this phase of finalization, all layout features should
- // be finalized.
- logCheckpointCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
- FinalizationStateManager stateManager =
- context.getFinalizationStateManager();
- if (!stateManager.crossedCheckpoint(
- FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
- createPipelinesAfterFinalization(context.getPipelineManager());
+ // If we reached this phase of finalization, all layout features should
+ // be finalized.
+ logCheckpointCrossed(FinalizationCheckpoint.MLV_EQUALS_SLV);
+ FinalizationStateManager stateManager =
+ context.getFinalizationStateManager();
+ if (!stateManager.crossedCheckpoint(
+ FinalizationCheckpoint.FINALIZATION_COMPLETE)) {
+ createPipelinesAfterFinalization(context);
+ // @Replicate methods are required to throw TimeoutException.
+ try {
stateManager.removeFinalizingMark();
+ } catch (TimeoutException ex) {
+ throw new IOException(ex);
}
- logCheckpointCrossed(FinalizationCheckpoint.FINALIZATION_COMPLETE);
- } catch (TimeoutException ex) {
- LOG.error("TimeoutException during postFinalizeUpgrade", ex);
- throw new IOException(ex);
}
}
@@ -172,8 +152,13 @@ public class SCMUpgradeFinalizer extends
msg += "\n New pipelines creation will remain frozen until Upgrade " +
"is finalized.";
- // Pipeline creation will remain frozen until postFinalizeUpgrade()
- pipelineManager.freezePipelineCreation();
+ // Pipeline creation should already be frozen when the finalization state
+ // manager set the checkpoint.
+ if (!pipelineManager.isPipelineCreationFrozen()) {
+ throw new SCMException("Error during finalization. Pipeline creation" +
+ "should have been frozen before closing existing pipelines.",
+ SCMException.ResultCodes.INTERNAL_ERROR);
+ }
for (Pipeline pipeline : pipelineManager.getPipelines()) {
if (pipeline.getPipelineState() != CLOSED) {
@@ -194,13 +179,25 @@ public class SCMUpgradeFinalizer extends
}
private void createPipelinesAfterFinalization(
- PipelineManager pipelineManager) {
- pipelineManager.resumePipelineCreation();
+ SCMUpgradeFinalizationContext context) throws SCMException,
+ NotLeaderException {
+ // Pipeline creation should already be resumed when the finalization state
+ // manager set the checkpoint.
+ PipelineManager pipelineManager = context.getPipelineManager();
+ if (pipelineManager.isPipelineCreationFrozen()) {
+ throw new SCMException("Error during finalization. Pipeline creation " +
+ "should have been resumed before waiting for new pipelines.",
+ SCMException.ResultCodes.INTERNAL_ERROR);
+ }
// Wait for at least one pipeline to be created before finishing
// finalization, so clients can write.
boolean hasPipeline = false;
while (!hasPipeline) {
+ // Break out of the wait and step down from driving finalization if this
+ // SCM is no longer the leader by throwing NotLeaderException.
+ context.getSCMContext().getTermOfLeader();
+
ReplicationConfig ratisThree =
ReplicationConfig.fromProtoTypeAndFactor(
HddsProtos.ReplicationType.RATIS,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index efa4bed92e..dd5cf9b401 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -315,4 +315,9 @@ public class MockPipelineManager implements PipelineManager
{
public void releaseWriteLock() {
}
+
+ @Override
+ public boolean isPipelineCreationFrozen() {
+ return false;
+ }
}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
index 49012abc61..11174a0341 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/upgrade/TestScmFinalization.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
+
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
@@ -61,6 +62,10 @@ public class TestScmFinalization {
private static final Logger LOG =
LoggerFactory.getLogger(TestScmFinalization.class);
+ // Indicates the current state of the mock pipeline manager's pipeline
+ // creation.
+ private boolean pipelineCreationFrozen = false;
+
/**
* Order of finalization checkpoints within the enum is used to determine
* which ones have been passed. If ordering within the enum is changed
@@ -89,6 +94,8 @@ public class TestScmFinalization {
HDDSLayoutVersionManager versionManager =
new HDDSLayoutVersionManager(
HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ PipelineManager pipelineManager =
+ getMockPipelineManager(FinalizationCheckpoint.FINALIZATION_REQUIRED);
// State manager keeps upgrade information in memory as well as writing
// it to disk, so we can mock the classes that handle disk ops for this
// test.
@@ -111,7 +118,7 @@ public class TestScmFinalization {
.setStorage(Mockito.mock(SCMStorageConfig.class))
.setLayoutVersionManager(versionManager)
.setSCMContext(scmContext)
- .setPipelineManager(Mockito.mock(PipelineManager.class))
+ .setPipelineManager(pipelineManager)
.setNodeManager(Mockito.mock(NodeManager.class))
.build();
stateManager.setUpgradeContext(context);
@@ -149,7 +156,9 @@ public class TestScmFinalization {
FinalizationStateManager stateManager,
FinalizationCheckpoint expectedCheckpoint) {
- assertTrue(context.isFinalizationCheckpointCrossed(expectedCheckpoint));
+ // SCM context should have been updated with the current checkpoint.
+ assertTrue(context.getFinalizationCheckpoint()
+ .hasCrossed(expectedCheckpoint));
for (FinalizationCheckpoint checkpoint: FinalizationCheckpoint.values()) {
LOG.info("Comparing expected checkpoint {} to {}", expectedCheckpoint,
checkpoint);
@@ -179,12 +188,6 @@ public class TestScmFinalization {
LOG.info("Testing finalization beginning at checkpoint {}",
initialCheckpoint);
- PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
- // After finalization, SCM will wait for at least one pipeline to be
- // created. It does not care about the contents of the pipeline list, so
- // just return something with length >= 1.
- Mockito.when(pipelineManager.getPipelines(Mockito.any(),
- Mockito.any())).thenReturn(Arrays.asList(null, null, null));
// Create the table and version manager to appear as if we left off from in
// progress finalization.
Table<String, String> finalizationStore =
@@ -198,6 +201,8 @@ public class TestScmFinalization {
SCMStorageConfig storage = Mockito.mock(SCMStorageConfig.class);
SCMContext scmContext = SCMContext.emptyContext();
scmContext.setFinalizationCheckpoint(initialCheckpoint);
+ PipelineManager pipelineManager =
+ getMockPipelineManager(initialCheckpoint);
FinalizationStateManager stateManager =
new FinalizationStateManagerTestImpl.Builder()
@@ -242,13 +247,13 @@ public class TestScmFinalization {
ArgumentMatchers.matches(OzoneConsts.FINALIZING_KEY),
ArgumentMatchers.matches(""));
+ // Next, all pipeline creation should be stopped.
+ inOrder.verify(pipelineManager, count).freezePipelineCreation();
+
if (initialCheckpoint == FinalizationCheckpoint.FINALIZATION_STARTED) {
count = Mockito.times(1);
}
- // Next, all pipeline creation should be stopped.
- inOrder.verify(pipelineManager, count).freezePipelineCreation();
-
// Next, each layout feature should be finalized.
for (HDDSLayoutFeature feature: HDDSLayoutFeature.values()) {
// Cannot finalize initial version since we are already there.
@@ -330,4 +335,29 @@ public class TestScmFinalization {
return UpgradeFinalizer.STARTING_MSG;
}
}
+
+ private PipelineManager getMockPipelineManager(
+ FinalizationCheckpoint inititalCheckpoint) {
+ PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+ // After finalization, SCM will wait for at least one pipeline to be
+ // created. It does not care about the contents of the pipeline list, so
+ // just return something with length >= 1.
+ Mockito.when(pipelineManager.getPipelines(Mockito.any(),
+ Mockito.any())).thenReturn(Arrays.asList(null, null, null));
+
+ // Set the initial value for pipeline creation based on the checkpoint.
+ // In a real cluster, this would be set on startup of the
+ // PipelineManagerImpl.
+ pipelineCreationFrozen =
+ !FinalizationManager.shouldCreateNewPipelines(inititalCheckpoint);
+ Mockito.doAnswer(args -> pipelineCreationFrozen = true)
+ .when(pipelineManager).freezePipelineCreation();
+ Mockito.doAnswer(args -> pipelineCreationFrozen = false)
+ .when(pipelineManager).resumePipelineCreation();
+
+ Mockito.doAnswer(args -> pipelineCreationFrozen)
+ .when(pipelineManager).isPipelineCreationFrozen();
+
+ return pipelineManager;
+ }
}
diff --git
a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 87a852b875..263da48ae2 100644
---
a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++
b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.failure.FailureManager;
@@ -104,7 +105,8 @@ public class MiniOzoneChaosCluster extends
MiniOzoneHAClusterImpl {
OMHAService omService, SCMHAService scmService,
List<HddsDatanodeService> hddsDatanodes, String clusterPath,
Set<Class<? extends Failures>> clazzes) {
- super(conf, omService, scmService, hddsDatanodes, clusterPath, null);
+ super(conf, new SCMConfigurator(), omService, scmService, hddsDatanodes,
+ clusterPath, null);
this.numDatanodes = getHddsDatanodes().size();
this.numOzoneManagers = omService.getServices().size();
this.numStorageContainerManagers = scmService.getServices().size();
@@ -425,11 +427,12 @@ public class MiniOzoneChaosCluster extends
MiniOzoneHAClusterImpl {
failedScmSet.add(scm);
}
- public void restartStorageContainerManager(StorageContainerManager scm,
- boolean waitForScm) throws IOException, TimeoutException,
- InterruptedException, AuthenticationException {
- super.restartStorageContainerManager(scm, waitForScm);
+ public StorageContainerManager restartStorageContainerManager(
+ StorageContainerManager scm, boolean waitForScm)
+ throws IOException, TimeoutException, InterruptedException,
+ AuthenticationException {
failedScmSet.remove(scm);
+ return super.restartStorageContainerManager(scm, waitForScm);
}
// Should the selected node be stopped or started.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
index 766c479155..7568907dc5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
@@ -22,7 +22,6 @@ import static java.lang.Thread.sleep;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
@@ -38,9 +37,7 @@ import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_F
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -53,15 +50,12 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -72,11 +66,11 @@ import
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneClusterProvider;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
@@ -84,7 +78,6 @@ import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.junit.jupiter.api.AfterEach;
@@ -114,7 +107,7 @@ public class TestHDDSUpgrade {
private static final int NUM_DATA_NODES = 3;
private static final int NUM_SCMS = 3;
- private MiniOzoneCluster cluster;
+ private MiniOzoneHAClusterImpl cluster;
private OzoneConfiguration conf;
private StorageContainerManager scm;
private ContainerManager scmContainerManager;
@@ -158,7 +151,8 @@ public class TestHDDSUpgrade {
SCMConfigurator scmConfigurator = new SCMConfigurator();
scmConfigurator.setUpgradeFinalizationExecutor(scmFinalizationExecutor);
- MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+ MiniOzoneCluster.Builder builder =
+ new MiniOzoneHAClusterImpl.Builder(conf)
.setNumDatanodes(NUM_DATA_NODES)
.setNumOfStorageContainerManagers(NUM_SCMS)
.setSCMConfigurator(scmConfigurator)
@@ -183,7 +177,7 @@ public class TestHDDSUpgrade {
}
public void init() throws Exception {
- cluster = clusterProvider.provide();
+ cluster = (MiniOzoneHAClusterImpl) clusterProvider.provide();
conf = cluster.getConf();
loadSCMState();
}
@@ -227,143 +221,6 @@ public class TestHDDSUpgrade {
key.close();
}
- /*
- * Helper function to test Pre-Upgrade conditions on the SCM
- */
- private void testPreUpgradeConditionsSCM() {
- Assert.assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
- scmVersionManager.getMetadataLayoutVersion());
- for (ContainerInfo ci : scmContainerManager.getContainers()) {
- Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, ci.getState());
- }
- }
-
- /*
- * Helper function to test Post-Upgrade conditions on the SCM
- */
- private void testPostUpgradeConditionsSCM() {
- loadSCMState();
- Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
- scmVersionManager.getMetadataLayoutVersion());
- Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
-
- // SCM should not return from finalization until there is at least one
- // pipeline to use.
- try {
- GenericTestUtils.waitFor(() -> {
- int pipelineCount = scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
- .size();
- if (pipelineCount >= 1) {
- return true;
- }
- return false;
- }, 500, 60000);
- } catch (TimeoutException | InterruptedException e) {
- Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
- }
-
- // SCM will not return from finalization until there is at least one
- // RATIS 3 pipeline. For this to exist, all three of our datanodes must
- // be in the HEALTHY state.
- testDataNodesStateOnSCM(HEALTHY, HEALTHY_READONLY);
-
- int countContainers = 0;
- for (ContainerInfo ci : scmContainerManager.getContainers()) {
- HddsProtos.LifeCycleState ciState = ci.getState();
- LOG.info("testPostUpgradeConditionsSCM: container state is {}",
- ciState.name());
- Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
- (ciState == HddsProtos.LifeCycleState.CLOSING) ||
- (ciState == HddsProtos.LifeCycleState.DELETING) ||
- (ciState == HddsProtos.LifeCycleState.DELETED) ||
- (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
- countContainers++;
- }
- Assert.assertTrue(countContainers >= numContainersCreated);
- }
-
- /*
- * Helper function to test Pre-Upgrade conditions on all the DataNodes.
- */
- private void testPreUpgradeConditionsDataNodes() {
- for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
- DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
- HDDSLayoutVersionManager dnVersionManager =
- dsm.getLayoutVersionManager();
- Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
- }
-
- int countContainers = 0;
- for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
- DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
- // Also verify that all the existing containers are open.
- for (Iterator<Container<?>> it =
- dsm.getContainer().getController().getContainers(); it.hasNext();) {
- Container container = it.next();
- Assert.assertTrue(container.getContainerState() ==
- ContainerProtos.ContainerDataProto.State.OPEN);
- countContainers++;
- }
- }
- Assert.assertTrue(countContainers >= 1);
- }
-
- /*
- * Helper function to test Post-Upgrade conditions on all the DataNodes.
- */
- private void testPostUpgradeConditionsDataNodes(
- ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
- List<ContainerProtos.ContainerDataProto.State> closeStates =
- Arrays.asList(validClosedContainerStates);
- // Allow closed and quasi closed containers as valid closed containers by
- // default.
- if (closeStates.isEmpty()) {
- closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
- }
-
- try {
- GenericTestUtils.waitFor(() -> {
- for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
- DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
- try {
- if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
- (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
- return false;
- }
- } catch (IOException e) {
- LOG.error("Exception. ", e);
- return false;
- }
- }
- return true;
- }, 500, 60000);
- } catch (TimeoutException | InterruptedException e) {
- Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
- }
-
- int countContainers = 0;
- for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
- DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
- HDDSLayoutVersionManager dnVersionManager =
- dsm.getLayoutVersionManager();
- Assert.assertEquals(dnVersionManager.getSoftwareLayoutVersion(),
- dnVersionManager.getMetadataLayoutVersion());
- Assert.assertTrue(dnVersionManager.getMetadataLayoutVersion() >= 1);
-
- // Also verify that all the existing containers are closed.
- for (Iterator<Container<?>> it =
- dsm.getContainer().getController().getContainers(); it.hasNext();) {
- Container<?> container = it.next();
- Assert.assertTrue("Container had unexpected state " +
- container.getContainerState(),
- closeStates.stream().anyMatch(
- state -> container.getContainerState().equals(state)));
- countContainers++;
- }
- }
- Assert.assertTrue(countContainers >= 1);
- }
-
/*
* Helper function to test that we can create new pipelines Post-Upgrade.
*/
@@ -379,31 +236,6 @@ public class TestHDDSUpgrade {
Assert.assertEquals(pid, ratisPipeline1.getId());
}
- /*
- * Helper function to test DataNode state on the SCM. Note that due to
- * timing constraints, sometime the node-state can transition to the next
- * state. This function expects the DataNode to be in NodeState "state" or
- * "alternateState". Some tests can enforce a unique NodeState test by
- * setting "alternateState = null".
- */
- private void testDataNodesStateOnSCM(NodeState state,
- NodeState alternateState) {
- int countNodes = 0;
- for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
- try {
- NodeState dnState =
- scm.getScmNodeManager().getNodeStatus(dn).getHealth();
- Assert.assertTrue((dnState == state) ||
- (alternateState == null ? false : dnState == alternateState));
- } catch (NodeNotFoundException e) {
- e.printStackTrace();
- Assert.fail("Node not found");
- }
- ++countNodes;
- }
- Assert.assertEquals(NUM_DATA_NODES, countNodes);
- }
-
/*
* Helper function to wait for Pipeline creation.
*/
@@ -443,8 +275,10 @@ public class TestHDDSUpgrade {
createTestContainers();
// Test the Pre-Upgrade conditions on SCM as well as DataNodes.
- testPreUpgradeConditionsSCM();
- testPreUpgradeConditionsDataNodes();
+ TestHddsUpgradeUtils.testPreUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList());
+ TestHddsUpgradeUtils.testPreUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes());
Set<PipelineID> preUpgradeOpenPipelines =
scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
@@ -458,11 +292,8 @@ public class TestHDDSUpgrade {
Assert.assertEquals(STARTING_FINALIZATION, status.status());
// Wait for the Finalization to complete on the SCM.
- while (status.status() != FINALIZATION_DONE) {
- status = scm.getFinalizationManager()
- .queryUpgradeFinalizationProgress("xyz",
- false, false);
- }
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(
+ cluster.getStorageContainerLocationClient(), "xyz");
Set<PipelineID> postUpgradeOpenPipelines =
scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
@@ -479,15 +310,20 @@ public class TestHDDSUpgrade {
Assert.assertEquals(0, numPreUpgradeOpenPipelines);
// Verify Post-Upgrade conditions on the SCM.
- testPostUpgradeConditionsSCM();
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList(),
+ numContainersCreated, NUM_DATA_NODES);
// All datanodes on the SCM should have moved to HEALTHY-READONLY state.
- testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+ TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+ cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+ HEALTHY_READONLY, HEALTHY);
// Verify the SCM has driven all the DataNodes through Layout Upgrade.
// In the happy path case, no containers should have been quasi closed as
// a result of the upgrade.
- testPostUpgradeConditionsDataNodes(CLOSED);
+ TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes(), numContainersCreated, CLOSED);
// Test that we can use a pipeline after upgrade.
// Will fail with exception if there are no pipelines.
@@ -568,7 +404,7 @@ public class TestHDDSUpgrade {
}
cluster.waitForClusterToBeReady();
} catch (Exception e) {
- LOG.info("DataNode Restarts Failed!");
+ LOG.error("DataNode Restarts Failed!", e);
testPassed.set(false);
}
loadSCMState();
@@ -997,8 +833,10 @@ public class TestHDDSUpgrade {
createKey();
// Test the Pre-Upgrade conditions on SCM as well as DataNodes.
- testPreUpgradeConditionsSCM();
- testPreUpgradeConditionsDataNodes();
+ TestHddsUpgradeUtils.testPreUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList());
+ TestHddsUpgradeUtils.testPreUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes());
// Trigger Finalization on the SCM
StatusAndMessages status =
@@ -1027,18 +865,24 @@ public class TestHDDSUpgrade {
// Verify Post-Upgrade conditions on the SCM.
// With failure injection
- testPostUpgradeConditionsSCM();
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList(), numContainersCreated,
+ NUM_DATA_NODES);
// All datanodes on the SCM should have moved to HEALTHY-READONLY state.
// Due to timing constraint also allow a "HEALTHY" state.
loadSCMState();
- testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+ TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+ cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+ HEALTHY_READONLY, HEALTHY);
// Need to wait for post finalization heartbeat from DNs.
LambdaTestUtils.await(600000, 500, () -> {
try {
loadSCMState();
- testDataNodesStateOnSCM(HEALTHY, null);
+ TestHddsUpgradeUtils.testDataNodesStateOnSCM(
+ cluster.getStorageContainerManagersList(), NUM_DATA_NODES,
+ HEALTHY, null);
sleep(100);
} catch (Throwable ex) {
LOG.info(ex.getMessage());
@@ -1048,7 +892,8 @@ public class TestHDDSUpgrade {
});
// Verify the SCM has driven all the DataNodes through Layout Upgrade.
- testPostUpgradeConditionsDataNodes();
+ TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes(), numContainersCreated);
// Verify that new pipeline can be created with upgraded datanodes.
try {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
new file mode 100644
index 0000000000..ccedede134
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHddsUpgradeUtils.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.upgrade;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.LambdaTestUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+
+/**
+ * Helper methods for testing HDDS upgrade finalization in integration tests.
+ */
+public final class TestHddsUpgradeUtils {
+
+ private TestHddsUpgradeUtils() { }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestHddsUpgradeUtils.class);
+
+ private static final ReplicationConfig RATIS_THREE =
+
ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+
+ public static void waitForFinalizationFromClient(
+ StorageContainerLocationProtocol scmClient, String clientID)
+ throws Exception {
+ LambdaTestUtils.await(60_000, 1_000, () -> {
+ UpgradeFinalizer.Status status = scmClient
+ .queryUpgradeFinalizationProgress(clientID, true, true)
+ .status();
+ LOG.info("Waiting for upgrade finalization to complete from client." +
+ " Current status is {}.", status);
+ return status == FINALIZATION_DONE || status == ALREADY_FINALIZED;
+ });
+ }
+
+ /*
+ * Helper function to test Pre-Upgrade conditions on the SCM
+ */
+ public static void testPreUpgradeConditionsSCM(
+ List<StorageContainerManager> scms) {
+ for (StorageContainerManager scm : scms) {
+ Assert.assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
+ scm.getLayoutVersionManager().getMetadataLayoutVersion());
+ for (ContainerInfo ci : scm.getContainerManager()
+ .getContainers()) {
+ Assert.assertEquals(HddsProtos.LifeCycleState.OPEN, ci.getState());
+ }
+ }
+ }
+
+ /*
+ * Helper function to test Post-Upgrade conditions on the SCM
+ */
+ public static void testPostUpgradeConditionsSCM(
+ List<StorageContainerManager> scms, int numContainers, int numDatanodes)
{
+ for (StorageContainerManager scm : scms) {
+ LOG.info("Testing post upgrade conditions on SCM with node ID: {}",
+ scm.getSCMNodeId());
+ testPostUpgradeConditionsSCM(scm, numContainers, numDatanodes);
+ }
+ }
+
+ public static void testPostUpgradeConditionsSCM(StorageContainerManager scm,
+ int numContainers, int numDatanodes) {
+
+ Assert.assertTrue(scm.getScmContext().getFinalizationCheckpoint()
+ .hasCrossed(FinalizationCheckpoint.FINALIZATION_COMPLETE));
+
+ HDDSLayoutVersionManager scmVersionManager = scm.getLayoutVersionManager();
+ Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
+ scmVersionManager.getMetadataLayoutVersion());
+ Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
+
+ // SCM should not return from finalization until there is at least one
+ // pipeline to use.
+ PipelineManager scmPipelineManager = scm.getPipelineManager();
+ try {
+ GenericTestUtils.waitFor(() -> {
+ int pipelineCount = scmPipelineManager.getPipelines(RATIS_THREE, OPEN)
+ .size();
+ if (pipelineCount >= 1) {
+ return true;
+ }
+ return false;
+ }, 500, 60000);
+ } catch (TimeoutException | InterruptedException e) {
+ Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
+ }
+
+ // SCM will not return from finalization until there is at least one
+ // RATIS 3 pipeline. For this to exist, all three of our datanodes must
+ // be in the HEALTHY state.
+ testDataNodesStateOnSCM(scm, numDatanodes, HEALTHY, HEALTHY_READONLY);
+
+ int countContainers = 0;
+ for (ContainerInfo ci : scm.getContainerManager().getContainers()) {
+ HddsProtos.LifeCycleState ciState = ci.getState();
+ LOG.info("testPostUpgradeConditionsSCM: container state is {}",
+ ciState.name());
+ Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
+ (ciState == HddsProtos.LifeCycleState.CLOSING) ||
+ (ciState == HddsProtos.LifeCycleState.DELETING) ||
+ (ciState == HddsProtos.LifeCycleState.DELETED) ||
+ (ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
+ countContainers++;
+ }
+ Assert.assertTrue(countContainers >= numContainers);
+ }
+
+ /*
+ * Helper function to test Pre-Upgrade conditions on all the DataNodes.
+ */
+ public static void testPreUpgradeConditionsDataNodes(
+ List<HddsDatanodeService> datanodes) {
+ for (HddsDatanodeService dataNode : datanodes) {
+ DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+ HDDSLayoutVersionManager dnVersionManager =
+ dsm.getLayoutVersionManager();
+ Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
+ }
+
+ int countContainers = 0;
+ for (HddsDatanodeService dataNode : datanodes) {
+ DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+ // Also verify that all the existing containers are open.
+ for (Iterator<Container<?>> it =
+ dsm.getContainer().getController().getContainers(); it.hasNext();) {
+ Container container = it.next();
+ Assert.assertSame(container.getContainerState(),
+ ContainerProtos.ContainerDataProto.State.OPEN);
+ countContainers++;
+ }
+ }
+ Assert.assertTrue(countContainers >= 1);
+ }
+
+ /*
+ * Helper function to test Post-Upgrade conditions on all the DataNodes.
+ */
+ public static void testPostUpgradeConditionsDataNodes(
+ List<HddsDatanodeService> datanodes, int numContainers,
+ ContainerProtos.ContainerDataProto.State... validClosedContainerStates) {
+ List<ContainerProtos.ContainerDataProto.State> closeStates =
+ Arrays.asList(validClosedContainerStates);
+ // Allow closed and quasi closed containers as valid closed containers by
+ // default.
+ if (closeStates.isEmpty()) {
+ closeStates = Arrays.asList(CLOSED, QUASI_CLOSED);
+ }
+
+ try {
+ GenericTestUtils.waitFor(() -> {
+ for (HddsDatanodeService dataNode : datanodes) {
+ DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+ try {
+ if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
+ (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to query datanode upgrade status.", e);
+ return false;
+ }
+ }
+ return true;
+ }, 500, 60000);
+ } catch (TimeoutException | InterruptedException e) {
+ Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
+ }
+
+ int countContainers = 0;
+ for (HddsDatanodeService dataNode : datanodes) {
+ DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+ HDDSLayoutVersionManager dnVersionManager =
+ dsm.getLayoutVersionManager();
+ Assert.assertEquals(dnVersionManager.getSoftwareLayoutVersion(),
+ dnVersionManager.getMetadataLayoutVersion());
+ Assert.assertTrue(dnVersionManager.getMetadataLayoutVersion() >= 1);
+
+ // Also verify that all the existing containers are closed.
+ for (Iterator<Container<?>> it =
+ dsm.getContainer().getController().getContainers(); it.hasNext();) {
+ Container<?> container = it.next();
+ Assert.assertTrue("Container had unexpected state " +
+ container.getContainerState(),
+ closeStates.stream().anyMatch(
+ state -> container.getContainerState().equals(state)));
+ countContainers++;
+ }
+ }
+ Assert.assertTrue(countContainers >= numContainers);
+ }
+
+ public static void testDataNodesStateOnSCM(List<StorageContainerManager>
scms,
+ int expectedDatanodeCount, HddsProtos.NodeState state,
+ HddsProtos.NodeState alternateState) {
+ scms.forEach(scm -> testDataNodesStateOnSCM(scm, expectedDatanodeCount,
+ state, alternateState));
+ }
+
+ /*
+ * Helper function to test DataNode state on the SCM. Note that due to
+ * timing constraints, sometime the node-state can transition to the next
+ * state. This function expects the DataNode to be in NodeState "state" or
+ * "alternateState". Some tests can enforce a unique NodeState test by
+ * setting "alternateState = null".
+ */
+ public static void testDataNodesStateOnSCM(StorageContainerManager scm,
+ int expectedDatanodeCount, HddsProtos.NodeState state,
+ HddsProtos.NodeState alternateState) {
+ int countNodes = 0;
+ for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
+ try {
+ HddsProtos.NodeState dnState =
+ scm.getScmNodeManager().getNodeStatus(dn).getHealth();
+ Assert.assertTrue((dnState == state) ||
+ (alternateState != null && dnState == alternateState));
+ } catch (NodeNotFoundException e) {
+ e.printStackTrace();
+ Assert.fail("Node not found");
+ }
+ ++countNodes;
+ }
+ Assert.assertEquals(expectedDatanodeCount, countNodes);
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
new file mode 100644
index 0000000000..d6696b3a70
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestScmHAFinalization.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.upgrade;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationCheckpoint;
+import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationStateManagerImpl;
+import org.apache.hadoop.hdds.scm.server.upgrade.SCMUpgradeFinalizationContext;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.upgrade.DefaultUpgradeFinalizationExecutor;
+import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
+import org.apache.hadoop.ozone.upgrade.UpgradeTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+
+/**
+ * Tests upgrade finalization failure scenarios and corner cases specific to
SCM
+ * HA.
+ */
+public class TestScmHAFinalization {
+ private static final String CLIENT_ID = UUID.randomUUID().toString();
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestScmHAFinalization.class);
+ private static final String METHOD_SOURCE =
+ "org.apache.hadoop.hdds.upgrade" +
+ ".TestScmHAFinalization#injectionPointsToTest";
+
+ private StorageContainerLocationProtocol scmClient;
+ private MiniOzoneHAClusterImpl cluster;
+ private static final int NUM_DATANODES = 3;
+ private static final int NUM_SCMS = 3;
+ private Future<?> finalizationFuture;
+
+ public void init(OzoneConfiguration conf,
+ UpgradeFinalizationExecutor<SCMUpgradeFinalizationContext> executor,
+ int numInactiveSCMs) throws Exception {
+
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setUpgradeFinalizationExecutor(executor);
+
+ MiniOzoneCluster.Builder clusterBuilder =
+ new MiniOzoneHAClusterImpl.Builder(conf)
+ .setNumOfStorageContainerManagers(NUM_SCMS)
+ .setNumOfActiveSCMs(NUM_SCMS - numInactiveSCMs)
+ .setScmLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion())
+ .setSCMServiceId("scmservice")
+ .setSCMConfigurator(configurator)
+ .setNumOfOzoneManagers(1)
+ .setNumDatanodes(NUM_DATANODES)
+ .setDnLayoutVersion(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ this.cluster = (MiniOzoneHAClusterImpl) clusterBuilder.build();
+
+ scmClient = cluster.getStorageContainerLocationClient();
+ cluster.waitForClusterToBeReady();
+
+ // Launch finalization from the client. In the current implementation,
+ // this call will block until finalization completes. If the test
+ // involves restarts or leader changes the client may be disconnected,
+ // but finalization should still proceed.
+ finalizationFuture = Executors.newSingleThreadExecutor().submit(
+ () -> {
+ try {
+ scmClient.finalizeScmUpgrade(CLIENT_ID);
+ } catch (IOException ex) {
+ LOG.info("finalization client failed. This may be expected if the"
+
+ " test injected failures.", ex);
+ }
+ });
+ }
+
+ @AfterEach
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Argument supplier for parameterized tests.
+ */
+ public static Stream<Arguments> injectionPointsToTest() {
+ // Do not test from BEFORE_PRE_FINALIZE_UPGRADE injection point.
+ // Finalization will not have started so there will be no persisted state
+ // to resume from.
+ return Stream.of(
+ Arguments.of(UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE),
+ Arguments.of(UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION),
+ Arguments.of(UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource(METHOD_SOURCE)
+ public void testFinalizationWithLeaderChange(
+ UpgradeTestInjectionPoints haltingPoint) throws Exception {
+
+ CountDownLatch pauseLatch = new CountDownLatch(1);
+ CountDownLatch unpauseLatch = new CountDownLatch(1);
+ init(new OzoneConfiguration(),
+ UpgradeTestUtils.newPausingFinalizationExecutor(haltingPoint,
+ pauseLatch, unpauseLatch, LOG), 0);
+ pauseLatch.await();
+
+ // Stop the leader, forcing a leader change in the middle of finalization.
+ // This will cause the initial client call for finalization
+ // to be interrupted.
+ StorageContainerManager oldLeaderScm = cluster.getActiveSCM();
+ LOG.info("Stopping current SCM leader {} to initiate a leader change.",
+ oldLeaderScm.getSCMNodeId());
+ cluster.shutdownStorageContainerManager(oldLeaderScm);
+
+ // While finalization is paused, check its state on the remaining SCMs.
+ checkMidFinalizationConditions(haltingPoint,
+ cluster.getStorageContainerManagersList());
+
+ // Wait for the remaining two SCMs to elect a new leader.
+ cluster.waitForClusterToBeReady();
+
+ // Restart actually creates a new SCM.
+ // Since this SCM will be a follower, the implementation of its upgrade
+ // finalization executor does not matter for this test.
+ cluster.restartStorageContainerManager(oldLeaderScm, true);
+
+ // Make sure the original SCM leader is not the leader anymore.
+ StorageContainerManager newLeaderScm = cluster.getActiveSCM();
+ Assertions.assertNotEquals(newLeaderScm.getSCMNodeId(),
+ oldLeaderScm.getSCMNodeId());
+
+ // Resume finalization from the new leader.
+ unpauseLatch.countDown();
+
+ // Client should complete exceptionally since the original SCM it
+ // requested to was restarted.
+ finalizationFuture.get();
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+ // Make sure old leader has caught up and all SCMs have finalized.
+ waitForScmsToFinalize(cluster.getStorageContainerManagersList());
+
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList(), 0, NUM_DATANODES);
+ TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes(), 0, CLOSED);
+ }
+
+ @ParameterizedTest
+ @MethodSource(METHOD_SOURCE)
+ public void testFinalizationWithRestart(
+ UpgradeTestInjectionPoints haltingPoint) throws Exception {
+ CountDownLatch terminateLatch = new CountDownLatch(1);
+ init(new OzoneConfiguration(),
+ UpgradeTestUtils.newTerminatingFinalizationExecutor(haltingPoint,
+ terminateLatch, LOG),
+ 0);
+ terminateLatch.await();
+
+ // Once upgrade finalization is stopped at the halting point, restart all
+ // SCMs.
+ LOG.info("Restarting all SCMs during upgrade finalization.");
+ // Restarting an SCM from mini ozone actually replaces the SCM with a new
+ // instance. We will use the normal upgrade finalization executor for
+ // these new instances, since the last one aborted at the halting point.
+ cluster.getSCMConfigurator()
+ .setUpgradeFinalizationExecutor(
+ new DefaultUpgradeFinalizationExecutor<>());
+ List<StorageContainerManager> originalSCMs =
+ cluster.getStorageContainerManagers();
+
+ for (StorageContainerManager scm: originalSCMs) {
+ cluster.restartStorageContainerManager(scm, false);
+ }
+
+ checkMidFinalizationConditions(haltingPoint,
+ cluster.getStorageContainerManagersList());
+
+ // After all SCMs were restarted, finalization should resume
+ // automatically once a leader is elected.
+ cluster.waitForClusterToBeReady();
+
+ finalizationFuture.get();
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+ // Once the leader tells the client finalization is complete, wait for all
+ // followers to catch up so we can check their state.
+ waitForScmsToFinalize(cluster.getStorageContainerManagersList());
+
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ cluster.getStorageContainerManagersList(), 0, NUM_DATANODES);
+ TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes(), 0, CLOSED);
+ }
+
+ @Test
+ public void testSnapshotFinalization() throws Exception {
+ int numInactiveSCMs = 1;
+ // Require snapshot installation after only a few transactions.
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_ENABLED, true);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_HA_RAFT_LOG_PURGE_GAP, 5);
+ conf.setLong(ScmConfigKeys.OZONE_SCM_HA_RATIS_SNAPSHOT_THRESHOLD,
+ 5);
+
+ init(conf, new DefaultUpgradeFinalizationExecutor<>(), numInactiveSCMs);
+
+ GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
+ .captureLogs(FinalizationStateManagerImpl.LOG);
+
+ StorageContainerManager inactiveScm = cluster.getInactiveSCM().next();
+ LOG.info("Inactive SCM node ID: {}", inactiveScm.getSCMNodeId());
+
+ List<StorageContainerManager> scms =
+ cluster.getStorageContainerManagersList();
+ List<StorageContainerManager> activeScms = new ArrayList<>();
+ for (StorageContainerManager scm : scms) {
+ if (!scm.getSCMNodeId().equals(inactiveScm.getSCMNodeId())) {
+ activeScms.add(scm);
+ }
+ }
+
+ // Wait for finalization from the client perspective.
+ finalizationFuture.get();
+ TestHddsUpgradeUtils.waitForFinalizationFromClient(scmClient, CLIENT_ID);
+ // Wait for two running SCMs to finish finalization.
+ waitForScmsToFinalize(activeScms);
+
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ activeScms, 0, NUM_DATANODES);
+ TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
+ cluster.getHddsDatanodes(), 0, CLOSED);
+
+ // Move SCM log index farther ahead to make sure a snapshot install
+ // happens on the restarted SCM.
+ for (int i = 0; i < 10; i++) {
+ ContainerWithPipeline container =
+ scmClient.allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, "owner");
+ scmClient.closeContainer(
+ container.getContainerInfo().getContainerID());
+ }
+
+ cluster.startInactiveSCM(inactiveScm.getSCMNodeId());
+ waitForScmToFinalize(inactiveScm);
+
+ TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
+ inactiveScm, 0, NUM_DATANODES);
+
+ // Use log to verify a snapshot was installed.
+ Assertions.assertTrue(logCapture.getOutput().contains("New SCM snapshot " +
+ "received with metadata layout version"));
+ }
+
+ private void waitForScmsToFinalize(Collection<StorageContainerManager> scms)
+ throws Exception {
+ for (StorageContainerManager scm: scms) {
+ waitForScmToFinalize(scm);
+ }
+ }
+
+ private void waitForScmToFinalize(StorageContainerManager scm)
+ throws Exception {
+ GenericTestUtils.waitFor(() -> !scm.isInSafeMode(), 500, 5000);
+ GenericTestUtils.waitFor(() -> {
+ FinalizationCheckpoint checkpoint =
+ scm.getScmContext().getFinalizationCheckpoint();
+ LOG.info("Waiting for SCM {} (leader? {}) to finalize. Current " +
+ "finalization checkpoint is {}",
+ scm.getSCMNodeId(), scm.checkLeader(), checkpoint);
+ return checkpoint.hasCrossed(
+ FinalizationCheckpoint.FINALIZATION_COMPLETE);
+ }, 2_000, 60_000);
+ }
+
+ private void checkMidFinalizationConditions(
+ UpgradeTestInjectionPoints haltingPoint,
+ List<StorageContainerManager> scms) {
+ for (StorageContainerManager scm: scms) {
+ switch (haltingPoint) {
+ case BEFORE_PRE_FINALIZE_UPGRADE:
+ Assertions.assertFalse(
+ scm.getPipelineManager().isPipelineCreationFrozen());
+ Assertions.assertEquals(
+ scm.getScmContext().getFinalizationCheckpoint(),
+ FinalizationCheckpoint.FINALIZATION_REQUIRED);
+ break;
+ case AFTER_PRE_FINALIZE_UPGRADE:
+ Assertions.assertTrue(
+ scm.getPipelineManager().isPipelineCreationFrozen());
+ Assertions.assertEquals(
+ scm.getScmContext().getFinalizationCheckpoint(),
+ FinalizationCheckpoint.FINALIZATION_STARTED);
+ break;
+ case AFTER_COMPLETE_FINALIZATION:
+ Assertions.assertFalse(
+ scm.getPipelineManager().isPipelineCreationFrozen());
+ Assertions.assertEquals(
+ scm.getScmContext().getFinalizationCheckpoint(),
+ FinalizationCheckpoint.MLV_EQUALS_SLV);
+ break;
+ case AFTER_POST_FINALIZE_UPGRADE:
+ Assertions.assertFalse(
+ scm.getPipelineManager().isPipelineCreationFrozen());
+ Assertions.assertEquals(
+ scm.getScmContext().getFinalizationCheckpoint(),
+ FinalizationCheckpoint.FINALIZATION_COMPLETE);
+ break;
+ default:
+ Assertions.fail("Unknown halting point in test: " + haltingPoint);
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 8e8de6df6a..1bdaef3a9b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone;
import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
import
org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -115,6 +115,7 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
private OzoneConfiguration conf;
+ private final SCMConfigurator scmConfigurator;
private StorageContainerManager scm;
private OzoneManager ozoneManager;
private final List<HddsDatanodeService> hddsDatanodes;
@@ -124,27 +125,13 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
private int waitForClusterToBeReadyTimeout = 120000; // 2 min
private CertificateClient caClient;
- /**
- * Creates a new MiniOzoneCluster.
- *
- * @throws IOException if there is an I/O error
- */
- protected MiniOzoneClusterImpl(OzoneConfiguration conf,
- OzoneManager ozoneManager,
- StorageContainerManager scm,
- List<HddsDatanodeService> hddsDatanodes) {
- this.conf = conf;
- this.ozoneManager = ozoneManager;
- this.scm = scm;
- this.hddsDatanodes = hddsDatanodes;
- }
-
/**
* Creates a new MiniOzoneCluster with Recon.
*
* @throws IOException if there is an I/O error
*/
MiniOzoneClusterImpl(OzoneConfiguration conf,
+ SCMConfigurator scmConfigurator,
OzoneManager ozoneManager,
StorageContainerManager scm,
List<HddsDatanodeService> hddsDatanodes,
@@ -154,6 +141,7 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
this.scm = scm;
this.hddsDatanodes = hddsDatanodes;
this.reconServer = reconServer;
+ this.scmConfigurator = scmConfigurator;
}
/**
@@ -165,13 +153,18 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
* @param conf
* @param hddsDatanodes
*/
- MiniOzoneClusterImpl(OzoneConfiguration conf,
+ MiniOzoneClusterImpl(OzoneConfiguration conf, SCMConfigurator
scmConfigurator,
List<HddsDatanodeService> hddsDatanodes, ReconServer reconServer) {
+ this.scmConfigurator = scmConfigurator;
this.conf = conf;
this.hddsDatanodes = hddsDatanodes;
this.reconServer = reconServer;
}
+ public SCMConfigurator getSCMConfigurator() {
+ return scmConfigurator;
+ }
+
@Override
public OzoneConfiguration getConf() {
return conf;
@@ -342,11 +335,6 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
@Override
public StorageContainerLocationProtocolClientSideTranslatorPB
getStorageContainerLocationClient() throws IOException {
- InetSocketAddress address = scm.getClientRpcAddress();
- LOG.info(
- "Creating StorageContainerLocationProtocol RPC client with address {}",
- address);
-
SCMContainerLocationFailoverProxyProvider proxyProvider =
new SCMContainerLocationFailoverProxyProvider(conf, null);
@@ -361,7 +349,7 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
LOG.info("Restarting SCM in cluster " + this.getClass());
scm.stop();
scm.join();
- scm = HddsTestUtils.getScmSimple(conf);
+ scm = HddsTestUtils.getScmSimple(conf, scmConfigurator);
scm.start();
if (waitForDatanode) {
waitForClusterToBeReady();
@@ -612,7 +600,8 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
hddsDatanodes = createHddsDatanodes(
Collections.singletonList(scm), reconServer);
- MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
+ MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf,
+ scmConfigurator, om, scm,
hddsDatanodes, reconServer);
cluster.setCAClient(certClient);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index 158cf1086f..60cb1a16c9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -94,12 +95,13 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
*/
public MiniOzoneHAClusterImpl(
OzoneConfiguration conf,
+ SCMConfigurator scmConfigurator,
OMHAService omhaService,
SCMHAService scmhaService,
List<HddsDatanodeService> hddsDatanodes,
String clusterPath,
ReconServer reconServer) {
- super(conf, hddsDatanodes, reconServer);
+ super(conf, scmConfigurator, hddsDatanodes, reconServer);
this.omhaService = omhaService;
this.scmhaService = scmhaService;
this.clusterMetaPath = clusterPath;
@@ -249,23 +251,25 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
LOG.info("Shutting down StorageContainerManager " + scm.getScmId());
scm.stop();
- scmhaService.deactivate(scm);
+ scmhaService.removeInstance(scm);
}
- public void restartStorageContainerManager(
+ public StorageContainerManager restartStorageContainerManager(
StorageContainerManager scm, boolean waitForSCM)
throws IOException, TimeoutException,
InterruptedException, AuthenticationException {
LOG.info("Restarting SCM in cluster " + this.getClass());
+ scmhaService.removeInstance(scm);
OzoneConfiguration scmConf = scm.getConfiguration();
shutdownStorageContainerManager(scm);
scm.join();
- scm = HddsTestUtils.getScmSimple(scmConf);
- scmhaService.activate(scm);
+ scm = HddsTestUtils.getScmSimple(scmConf, getSCMConfigurator());
+ scmhaService.addInstance(scm, true);
scm.start();
if (waitForSCM) {
waitForClusterToBeReady();
}
+ return scm;
}
public String getClusterId() throws IOException {
@@ -433,7 +437,8 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
scmService.getActiveServices(), reconServer);
MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf,
- omService, scmService, hddsDatanodes, path, reconServer);
+ scmConfigurator, omService, scmService, hddsDatanodes, path,
+ reconServer);
if (startDataNodes) {
cluster.startHddsDatanodes();
@@ -583,7 +588,8 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
} else {
StorageContainerManager.scmBootstrap(scmConfig);
}
- StorageContainerManager scm =
HddsTestUtils.getScmSimple(scmConfig);
+ StorageContainerManager scm =
+ HddsTestUtils.getScmSimple(scmConfig, scmConfigurator);
HealthyPipelineSafeModeRule rule =
scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule();
if (rule != null) {
@@ -977,7 +983,11 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
}
public boolean removeInstance(Type t) {
- return services.remove(t);
+ boolean result = services.remove(t);
+ serviceMap.remove(serviceIdProvider.apply(t));
+ activeServices.remove(t);
+ inactiveServices.remove(t);
+ return result;
}
public void addInstance(Type t, boolean isActive) {
@@ -1053,7 +1063,7 @@ public class MiniOzoneHAClusterImpl extends
MiniOzoneClusterImpl {
}
public List<StorageContainerManager> getStorageContainerManagers() {
- return this.scmhaService.getServices();
+ return new ArrayList<>(this.scmhaService.getServices());
}
public StorageContainerManager getStorageContainerManager() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]