This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3698-nonrolling-upgrade
by this push:
new 7266f32 HDDS-4914. Failure injection and validating HDDS upgrade.
(#1998)
7266f32 is described below
commit 7266f322c56a7b731996d541a856d468d5c42b2a
Author: prashantpogde <[email protected]>
AuthorDate: Wed Apr 14 07:48:09 2021 -0700
HDDS-4914. Failure injection and validating HDDS upgrade. (#1998)
---
.../ozone/upgrade/BasicUpgradeFinalizer.java | 34 +-
.../DefaultUpgradeFinalizationExecutor.java | 78 ++
.../hadoop/ozone/upgrade/UpgradeFinalizer.java | 8 +-
.../InjectedUpgradeFinalizationExecutor.java | 133 ++++
.../hadoop/ozone/upgrade/TestUpgradeFinalizer.java | 16 +
.../common/statemachine/DatanodeStateMachine.java | 13 +-
.../upgrade/DataNodeUpgradeFinalizer.java | 82 +--
.../scm/pipeline/BackgroundPipelineCreator.java | 1 +
.../hdds/scm/server/StorageContainerManager.java | 6 +-
.../scm/server/upgrade/SCMUpgradeFinalizer.java | 84 +--
.../hadoop/hdds/upgrade/TestHDDSUpgrade.java | 805 ++++++++++++++++++++-
.../ozone/om/upgrade/OMUpgradeFinalizer.java | 83 +--
12 files changed, 1160 insertions(+), 183 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
index 04ffead..1c0e3d3 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -57,18 +57,44 @@ public abstract class BasicUpgradeFinalizer
protected V versionManager;
protected String clientID;
protected T component;
+ protected DefaultUpgradeFinalizationExecutor finalizationExecutor;
private Queue<String> msgs = new ConcurrentLinkedQueue<>();
protected boolean isDone = false;
public BasicUpgradeFinalizer(V versionManager) {
this.versionManager = versionManager;
+ this.finalizationExecutor =
+ new DefaultUpgradeFinalizationExecutor();
+ }
+
+ /**
+ * Sets the Finalization Executor driver.
+ * @param executor FinalizationExecutor.
+ */
+
+ public void setFinalizationExecutor(DefaultUpgradeFinalizationExecutor
+ executor) {
+ finalizationExecutor = executor;
+ }
+
+ @Override
+ public DefaultUpgradeFinalizationExecutor getFinalizationExecutor() {
+ return finalizationExecutor;
}
public boolean isFinalizationDone() {
return isDone;
}
+ public void markFinalizationDone() {
+ isDone = true;
+ }
+
+ public V getVersionManager() {
+ return versionManager;
+ }
+
public synchronized StatusAndMessages preFinalize(String upgradeClientID,
T id)
throws UpgradeException {
@@ -182,7 +208,6 @@ public abstract class BasicUpgradeFinalizer
|| status.equals(FINALIZATION_DONE);
}
-
protected void finalizeFeature(LayoutFeature feature, Storage config,
Optional<? extends UpgradeAction> action)
throws UpgradeException {
@@ -436,4 +461,11 @@ public abstract class BasicUpgradeFinalizer
protected void updateLayoutVersionInDB(V vm, T comp) throws IOException {
throw new UnsupportedOperationException();
}
+
+ protected abstract void postFinalizeUpgrade() throws IOException;
+
+ protected abstract void finalizeUpgrade(Storage storageConfig)
+ throws UpgradeException;
+
+ protected abstract boolean preFinalizeUpgrade() throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
new file mode 100644
index 0000000..14c1f2d
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/DefaultUpgradeFinalizationExecutor.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DefaultUpgradeFinalizationExecutor for driving the main part of
finalization.
+ * Unit/Integration tests can override this to provide error injected version
+ * of this class.
+ */
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class DefaultUpgradeFinalizationExecutor {
+ static final Logger LOG =
+ LoggerFactory.getLogger(DefaultUpgradeFinalizationExecutor.class);
+
+ public DefaultUpgradeFinalizationExecutor() {
+ }
+
+ public Void execute(Storage storageConfig,
+ BasicUpgradeFinalizer basicUpgradeFinalizer)
+ throws Exception {
+ try {
+ basicUpgradeFinalizer.emitStartingMsg();
+ basicUpgradeFinalizer.getVersionManager()
+ .setUpgradeState(FINALIZATION_IN_PROGRESS);
+
+ /*
+ * Before we can call finalize the feature, we need to make sure that
+ * all existing pipelines are closed and pipeline Manger would freeze
+ * all new pipeline creation.
+ */
+ if(!basicUpgradeFinalizer.preFinalizeUpgrade()) {
+ return null;
+ }
+
+ basicUpgradeFinalizer.finalizeUpgrade(storageConfig);
+
+ basicUpgradeFinalizer.postFinalizeUpgrade();
+
+ basicUpgradeFinalizer.emitFinishedMsg();
+ return null;
+ } catch (Exception e) {
+ LOG.warn("Upgrade Finalization failed with following Exception:");
+ e.printStackTrace();
+ if (basicUpgradeFinalizer.getVersionManager().needsFinalization()) {
+ basicUpgradeFinalizer.getVersionManager()
+ .setUpgradeState(FINALIZATION_REQUIRED);
+ throw (e);
+ }
+ } finally {
+ basicUpgradeFinalizer.markFinalizationDone();
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
index ffa6e06..59387b3 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
@@ -64,7 +64,7 @@ public interface UpgradeFinalizer<T> {
STARTING_FINALIZATION,
FINALIZATION_IN_PROGRESS,
FINALIZATION_DONE,
- FINALIZATION_REQUIRED
+ FINALIZATION_REQUIRED,
}
/**
@@ -72,7 +72,6 @@ public interface UpgradeFinalizer<T> {
* ongoing, the messages that should be passed to the initiating client of
* finalization.
* This translates to a counterpart in the RPC layer.
- * @see UpgradeFinalizationStatus in OMClientProtocol.proto
*/
class StatusAndMessages {
private Status status;
@@ -187,4 +186,9 @@ public interface UpgradeFinalizer<T> {
*/
void runPrefinalizeStateActions(Storage storage, T service)
throws IOException;
+
+ /**
+ * get the Finalization Executor driver.
+ */
+ DefaultUpgradeFinalizationExecutor getFinalizationExecutor();
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
new file mode 100644
index 0000000..6b512b6
--- /dev/null
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/InjectedUpgradeFinalizationExecutor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Failure injected extension of DefaultUpgradeFinalizationExecutor that
+ * can be used by Unit/Integration Tests.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class InjectedUpgradeFinalizationExecutor extends
+ DefaultUpgradeFinalizationExecutor {
+ static final Logger LOG =
+ LoggerFactory.getLogger(InjectedUpgradeFinalizationExecutor.class);
+
+ private Callable<Boolean> injectTestFunction;
+ private UpgradeTestInjectionPoints testInjectionPoint;
+
+ public enum UpgradeTestInjectionPoints {
+ BEFORE_PRE_FINALIZE_UPGRADE(1),
+ AFTER_PRE_FINALIZE_UPGRADE(2),
+ AFTER_COMPLETE_FINALIZATION(4),
+ AFTER_POST_FINALIZE_UPGRADE(5);
+
+ private int val;
+ UpgradeTestInjectionPoints(int value) {
+ val = value;
+ }
+
+ public int getValue() {
+ return val;
+ }
+ }
+
+ static class UpgradeTestInjectionAbort extends Exception {
+ UpgradeTestInjectionAbort() {
+ }
+ }
+
+ @Override
+ public Void execute(Storage storageConfig,
+ BasicUpgradeFinalizer basicUpgradeFinalizer)
+ throws Exception {
+ try {
+ injectTestFunctionAtThisPoint(BEFORE_PRE_FINALIZE_UPGRADE);
+ basicUpgradeFinalizer.emitStartingMsg();
+ basicUpgradeFinalizer.getVersionManager()
+ .setUpgradeState(FINALIZATION_IN_PROGRESS);
+
+ if(!basicUpgradeFinalizer.preFinalizeUpgrade()) {
+ return null;
+ }
+ injectTestFunctionAtThisPoint(AFTER_PRE_FINALIZE_UPGRADE);
+
+ basicUpgradeFinalizer.finalizeUpgrade(storageConfig);
+
+ injectTestFunctionAtThisPoint(AFTER_COMPLETE_FINALIZATION);
+
+ basicUpgradeFinalizer.postFinalizeUpgrade();
+ injectTestFunctionAtThisPoint(AFTER_POST_FINALIZE_UPGRADE);
+
+ basicUpgradeFinalizer.emitFinishedMsg();
+ return null;
+ } catch (Exception e) {
+ LOG.warn("Upgrade Finalization failed with following Exception:");
+ e.printStackTrace();
+ if (basicUpgradeFinalizer.getVersionManager().needsFinalization()) {
+ basicUpgradeFinalizer.getVersionManager()
+ .setUpgradeState(FINALIZATION_REQUIRED);
+ }
+ } finally {
+ basicUpgradeFinalizer.markFinalizationDone();
+ }
+ return null;
+ }
+
+ /**
+ * Interface to inject arbitrary failures for stress testing.
+ * @param injectedTestFunction that will be called
+ * code execution reached injectTestFunctionAtThisPoint() location.
+ * @param pointIndex code execution point for a given thread.
+ */
+ public void configureTestInjectionFunction(
+ UpgradeTestInjectionPoints pointIndex,
+ Callable<Boolean> injectedTestFunction) {
+ injectTestFunction = injectedTestFunction;
+ testInjectionPoint = pointIndex;
+ }
+
+ /**
+ * Interface to inject error at a given point in an upgrade thread.
+ * @param pointIndex TestFunction Injection point in an upgrade thread.
+ * @return "true" if the calling thread should not continue with further
+ * upgrade processing, "false" otherwise.
+ */
+ public void injectTestFunctionAtThisPoint(
+ UpgradeTestInjectionPoints pointIndex) throws Exception {
+ if ((testInjectionPoint != null) &&
+ (pointIndex.getValue() == testInjectionPoint.getValue()) &&
+ (injectTestFunction != null) && injectTestFunction.call()) {
+ throw new UpgradeTestInjectionAbort();
+ }
+ return;
+ }
+}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
index 1a0fefd..a6d6d57 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestUpgradeFinalizer.java
@@ -130,6 +130,22 @@ public class TestUpgradeFinalizer {
}
@Override
+ protected void postFinalizeUpgrade() throws IOException {
+ return;
+ }
+
+ @Override
+ protected void finalizeUpgrade(Storage storageConfig)
+ throws UpgradeException {
+ return;
+ }
+
+ @Override
+ protected boolean preFinalizeUpgrade() throws IOException {
+ return false;
+ }
+
+ @Override
public void runPrefinalizeStateActions(Storage storage,
MockComponent mockComponent)
throws IOException {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 029899b..45cf48e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
@@ -60,6 +61,7 @@ import
org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer;
import org.apache.hadoop.ozone.container.upgrade.DatanodeMetadataFeatures;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.Time;
@@ -601,8 +603,7 @@ public class DatanodeStateMachine implements Closeable {
return layoutStorage;
}
- @VisibleForTesting
- public boolean canFinalizeDataNode() {
+ private boolean canFinalizeDataNode() {
// Lets be sure that we do not have any open container before we return
// from here. This function should be called in its own finalizer thread
// context.
@@ -610,10 +611,13 @@ public class DatanodeStateMachine implements Closeable {
getContainer().getController().getContainers();
while (containerIt.hasNext()) {
Container ctr = containerIt.next();
- switch (ctr.getContainerState()) {
+ State state = ctr.getContainerState();
+ switch (state) {
case OPEN:
case CLOSING:
case UNHEALTHY:
+ LOG.warn("FinalizeUpgrade : Waiting for container to close, current " +
+ "state is: {}", state);
return false;
default:
continue;
@@ -642,4 +646,7 @@ public class DatanodeStateMachine implements Closeable {
return upgradeFinalizer.reportStatus(datanodeDetails.getUuidString(),
false);
}
+ public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
+ return upgradeFinalizer;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
index df4ebde..f4e7026 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DataNodeUpgradeFinalizer.java
@@ -19,13 +19,10 @@
package org.apache.hadoop.ozone.container.upgrade;
import static
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -33,12 +30,18 @@ import org.apache.hadoop.ozone.common.Storage;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* UpgradeFinalizer for the DataNode.
*/
public class DataNodeUpgradeFinalizer extends
BasicUpgradeFinalizer<DatanodeStateMachine, HDDSLayoutVersionManager> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataNodeUpgradeFinalizer.class);
+ private DatanodeStateMachine datanodeStateMachine;
public DataNodeUpgradeFinalizer(HDDSLayoutVersionManager versionManager,
String optionalClientID) {
@@ -50,54 +53,51 @@ public class DataNodeUpgradeFinalizer extends
public StatusAndMessages finalize(String upgradeClientID,
DatanodeStateMachine dsm)
throws IOException {
+ datanodeStateMachine = dsm;
StatusAndMessages response = preFinalize(upgradeClientID, dsm);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
}
- new Worker(dsm).call();
+ try {
+ getFinalizationExecutor().execute(dsm.getLayoutStorage(),
+ this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e.getMessage());
+ }
return STARTING_MSG;
}
- private class Worker implements Callable<Void> {
- private DatanodeStateMachine datanodeStateMachine;
-
- /**
- * Initiates the Worker, for the specified DataNode instance.
- * @param dsm the DataNodeStateMachine instance on which to finalize the
- * new LayoutFeatures.
- */
- Worker(DatanodeStateMachine dsm) {
- datanodeStateMachine = dsm;
+ @Override
+ public boolean preFinalizeUpgrade() throws IOException {
+ if(!datanodeStateMachine.preFinalizeUpgrade()) {
+ // DataNode is not yet ready to finalize.
+ // Reset the Finalization state.
+ versionManager.setUpgradeState(FINALIZATION_REQUIRED);
+ String msg = "Pre Finalization checks failed on the DataNode.";
+ logAndEmit(msg);
+ return false;
}
+ return true;
+ }
- @Override
- public Void call() throws IOException {
- if(!datanodeStateMachine.preFinalizeUpgrade()) {
- // datanode is not yet ready to finalize.
- // Reset the Finalization state.
- versionManager.setUpgradeState(FINALIZATION_REQUIRED);
- return null;
- }
- try {
- emitStartingMsg();
- versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
- for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
- Optional<? extends UpgradeAction> action =
- f.datanodeAction(ON_FINALIZE);
- finalizeFeature(f, datanodeStateMachine.getLayoutStorage(), action);
- updateLayoutVersionInVersionFile(f,
- datanodeStateMachine.getLayoutStorage());
- versionManager.finalized(f);
- }
- versionManager.completeFinalization();
- datanodeStateMachine.postFinalizeUpgrade();
- emitFinishedMsg();
- return null;
- } finally {
- versionManager.setUpgradeState(FINALIZATION_DONE);
- isDone = true;
- }
+ @Override
+ public void postFinalizeUpgrade() throws IOException {
+ datanodeStateMachine.postFinalizeUpgrade();
+ }
+
+ @Override
+ protected void finalizeUpgrade(Storage storageConfig)
+ throws UpgradeException {
+ for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+ Optional<? extends UpgradeAction> action =
+ f.datanodeAction(ON_FINALIZE);
+ finalizeFeature(f, datanodeStateMachine.getLayoutStorage(), action);
+ updateLayoutVersionInVersionFile(f,
+ datanodeStateMachine.getLayoutStorage());
+ versionManager.finalized(f);
}
+ versionManager.completeFinalization();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 7700070..ffaaccd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -117,6 +117,7 @@ class BackgroundPipelineCreator {
// TODO: #CLUTIL Different replication factor may need to be supported
if(pausePipelineCreation.get()) {
+ LOG.info("Pipeline Creation is paused.");
return;
}
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index f6d4eb9..d88745b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1778,4 +1778,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
) throws IOException {
return upgradeFinalizer.reportStatus(upgradeClientID, takeover);
}
-}
\ No newline at end of file
+
+ public UpgradeFinalizer<StorageContainerManager> getUpgradeFinalizer() {
+ return upgradeFinalizer;
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
index 27dbb6b..1c8a3e4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -19,21 +19,18 @@
package org.apache.hadoop.hdds.scm.server.upgrade;
import static
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.Callable;
-import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
/**
* UpgradeFinalizer for the Storage Container Manager service.
@@ -41,6 +38,8 @@ import
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
public class SCMUpgradeFinalizer extends
BasicUpgradeFinalizer<StorageContainerManager, HDDSLayoutVersionManager> {
+ private StorageContainerManager storageContainerManager;
+
public SCMUpgradeFinalizer(HDDSLayoutVersionManager versionManager) {
super(versionManager);
}
@@ -49,59 +48,52 @@ public class SCMUpgradeFinalizer extends
public StatusAndMessages finalize(String upgradeClientID,
StorageContainerManager scm)
throws IOException {
+ storageContainerManager = scm;
StatusAndMessages response = preFinalize(upgradeClientID, scm);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
}
- new Worker(scm).call();
+ try {
+ getFinalizationExecutor().execute(scm.getScmStorageConfig(), this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e.getMessage());
+ }
return STARTING_MSG;
}
- private class Worker implements Callable<Void> {
- private StorageContainerManager scm;
-
- /**
- * Initiates the Worker, for the specified SCM instance.
- * @param scm the StorageContainerManager instance on which to finalize the
- * new LayoutFeatures.
+ @Override
+ public boolean preFinalizeUpgrade() throws IOException {
+ /*
+ * Before we can call finalize the feature, we need to make sure that
+ * all existing pipelines are closed and pipeline Manger would freeze
+ * all new pipeline creation.
*/
- Worker(StorageContainerManager scm) {
- this.scm = scm;
- }
+ String msg = " Existing pipelines and containers will be closed " +
+ "during Upgrade.";
+ msg += "\n New pipelines creation will remain frozen until Upgrade " +
+ "is finalized.";
- @Override
- public Void call() throws IOException {
- try {
- emitStartingMsg();
- versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
- /*
- * Before we can call finalize the feature, we need to make sure that
- * all existing pipelines are closed and pipeline Manger would freeze
- * all new pipeline creation.
- */
- String msg = "Existing pipelines and containers will be closed " +
- "during upgrade.";
- msg += "\nNew pipelines creation will remain frozen until upgrade " +
- "is finalized.";
- scm.preFinalizeUpgrade();
- logAndEmit(msg);
- SCMStorageConfig storage = scm.getScmStorageConfig();
+ storageContainerManager.preFinalizeUpgrade();
+ logAndEmit(msg);
+ return true;
+ }
- for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
- Optional<? extends UpgradeAction> action = f.scmAction(ON_FINALIZE);
- finalizeFeature(f, storage, action);
- updateLayoutVersionInVersionFile(f, storage);
- versionManager.finalized(f);
- }
- versionManager.completeFinalization();
- scm.postFinalizeUpgrade();
- emitFinishedMsg();
- return null;
- } finally {
- versionManager.setUpgradeState(FINALIZATION_DONE);
- isDone = true;
- }
+ @Override
+ protected void finalizeUpgrade(Storage storageConfig)
+ throws UpgradeException {
+ for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+ Optional<? extends UpgradeAction> action = f.scmAction(ON_FINALIZE);
+ finalizeFeature(f, storageConfig, action);
+ updateLayoutVersionInVersionFile(f, storageConfig);
+ versionManager.finalized(f);
}
+ versionManager.completeFinalization();
+ }
+
+ public void postFinalizeUpgrade() throws IOException {
+ storageContainerManager.postFinalizeUpgrade();
+ emitFinishedMsg();
}
@Override
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
index b5543d1..5c79d7a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/upgrade/TestHDDSUpgrade.java
@@ -18,19 +18,29 @@
package org.apache.hadoop.hdds.upgrade;
+import static java.lang.Thread.sleep;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.INITIAL_VERSION;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_COMPLETE_FINALIZATION;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_POST_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.AFTER_PRE_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints.BEFORE_PRE_FINALIZE_UPGRADE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -38,6 +48,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -50,6 +61,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -59,14 +71,22 @@ import
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
+import org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor;
+import
org.apache.hadoop.ozone.upgrade.InjectedUpgradeFinalizationExecutor.UpgradeTestInjectionPoints;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
+import
org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -79,10 +99,10 @@ import org.slf4j.LoggerFactory;
public class TestHDDSUpgrade {
/**
- * Set a timeout for each test.
- */
+ * Set a timeout for each test.
+ */
@Rule
- public Timeout timeout = new Timeout(300000);
+ public Timeout timeout = new Timeout(11000000);
private static final Logger LOG =
LoggerFactory.getLogger(TestHDDSUpgrade.class);
private static final int NUM_DATA_NODES = 3;
@@ -92,9 +112,9 @@ public class TestHDDSUpgrade {
private StorageContainerManager scm;
private ContainerManagerV2 scmContainerManager;
private PipelineManager scmPipelineManager;
- private Pipeline ratisPipeline1;
private final int numContainersCreated = 1;
private HDDSLayoutVersionManager scmVersionManager;
+ private AtomicBoolean testPassed = new AtomicBoolean(true);
/**
* Create a MiniDFSCluster for testing.
@@ -102,39 +122,75 @@ public class TestHDDSUpgrade {
* @throws IOException
*/
@Before
+ public void setUp() throws Exception {
+ init();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ shutdown();
+ }
+
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1000,
- TimeUnit.MILLISECONDS);
-
+ TimeUnit.MILLISECONDS);
conf.set(OZONE_DATANODE_PIPELINE_LIMIT, "1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(NUM_DATA_NODES)
// allow only one FACTOR THREE pipeline.
.setTotalPipelineNumLimit(NUM_DATA_NODES + 1)
- .setHbInterval(1000)
- .setHbProcessorInterval(1000)
+ .setHbInterval(500)
+ .setHbProcessorInterval(500)
.setScmLayoutVersion(INITIAL_VERSION.layoutVersion())
.setDnLayoutVersion(INITIAL_VERSION.layoutVersion())
.build();
cluster.waitForClusterToBeReady();
- scm = cluster.getStorageContainerManager();
- scmContainerManager = scm.getContainerManager();
- scmPipelineManager = scm.getPipelineManager();
- scmVersionManager = scm.getLayoutVersionManager();
-
+ loadSCMState();
}
/**
* Shutdown MiniDFSCluster.
*/
- @After
public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
+ /*
+ * Some tests repeatedly modify the cluster. Helper function to reload the
+ * latest SCM state.
+ */
+ private void loadSCMState(){
+ scm = cluster.getStorageContainerManager();
+ scmContainerManager = scm.getContainerManager();
+ scmPipelineManager = scm.getPipelineManager();
+ scmVersionManager = scm.getLayoutVersionManager();
+ }
+
+
+ /*
+ * helper function to create a Key.
+ */
+ private void createKey() throws IOException {
+ final String uniqueId = "testhddsupgrade";
+ OzoneClient client = OzoneClientFactory.getRpcClient(conf);
+ ObjectStore objectStore = client.getObjectStore();
+ objectStore.createVolume(uniqueId);
+ objectStore.getVolume(uniqueId).createBucket(uniqueId);
+ OzoneOutputStream key =
+ objectStore.getVolume(uniqueId).getBucket(uniqueId)
+ .createKey(uniqueId, 1024, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+ key.write(uniqueId.getBytes(UTF_8));
+ key.flush();
+ key.close();
+ }
+
+ /*
+ * Helper function to test Pre-Upgrade conditions on the SCM
+ */
private void testPreUpgradeConditionsSCM() {
Assert.assertEquals(INITIAL_VERSION.layoutVersion(),
scmVersionManager.getMetadataLayoutVersion());
@@ -143,13 +199,29 @@ public class TestHDDSUpgrade {
}
}
+ /*
+ * Helper function to test Post-Upgrade conditions on the SCM
+ */
private void testPostUpgradeConditionsSCM() {
+ loadSCMState();
Assert.assertEquals(scmVersionManager.getSoftwareLayoutVersion(),
scmVersionManager.getMetadataLayoutVersion());
Assert.assertTrue(scmVersionManager.getMetadataLayoutVersion() >= 1);
// SCM should not return from finalization until there is at least one
// pipeline to use.
+ try {
+ GenericTestUtils.waitFor(() -> {
+ int pipelineCount = scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
+ .size();
+ if (pipelineCount >= 1) {
+ return true;
+ }
+ return false;
+ }, 500, 60000);
+ } catch (TimeoutException | InterruptedException e) {
+ Assert.fail("Timeout waiting for Upgrade to complete on SCM.");
+ }
int pipelineCount = scmPipelineManager.getPipelines(RATIS, THREE, OPEN)
.size();
Assert.assertTrue(pipelineCount >= 1);
@@ -157,26 +229,32 @@ public class TestHDDSUpgrade {
// SCM will not return from finalization until there is at least one
// RATIS 3 pipeline. For this to exist, all three of our datanodes must
// be in the HEALTHY state.
- testDataNodesStateOnSCM(HEALTHY);
+ testDataNodesStateOnSCM(HEALTHY, HEALTHY_READONLY);
int countContainers = 0;
for (ContainerInfo ci : scmContainerManager.getContainers()) {
HddsProtos.LifeCycleState ciState = ci.getState();
+ LOG.info("testPostUpgradeConditionsSCM: container state is {}",
+ ciState.name());
Assert.assertTrue((ciState == HddsProtos.LifeCycleState.CLOSED) ||
(ciState == HddsProtos.LifeCycleState.CLOSING) ||
+ (ciState == HddsProtos.LifeCycleState.DELETING) ||
+ (ciState == HddsProtos.LifeCycleState.DELETED) ||
(ciState == HddsProtos.LifeCycleState.QUASI_CLOSED));
countContainers++;
}
- Assert.assertEquals(numContainersCreated, countContainers);
+ Assert.assertTrue(countContainers >= numContainersCreated);
}
+ /*
+ * Helper function to test Pre-Upgrade conditions on all the DataNodes.
+ */
private void testPreUpgradeConditionsDataNodes() {
for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
HDDSLayoutVersionManager dnVersionManager =
dsm.getLayoutVersionManager();
Assert.assertEquals(0, dnVersionManager.getMetadataLayoutVersion());
-
}
int countContainers = 0;
@@ -194,14 +272,17 @@ public class TestHDDSUpgrade {
Assert.assertTrue(countContainers >= 1);
}
-
+ /*
+ * Helper function to test Post-Upgrade conditions on all the DataNodes.
+ */
private void testPostUpgradeConditionsDataNodes() {
try {
GenericTestUtils.waitFor(() -> {
for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
try {
- if (dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) {
+ if ((dsm.queryUpgradeStatus().status() != FINALIZATION_DONE) &&
+ (dsm.queryUpgradeStatus().status() != ALREADY_FINALIZED)) {
return false;
}
} catch (IOException e) {
@@ -209,7 +290,7 @@ public class TestHDDSUpgrade {
}
}
return true;
- }, 2000, 20000);
+ }, 500, 60000);
} catch (TimeoutException | InterruptedException e) {
Assert.fail("Timeout waiting for Upgrade to complete on Data Nodes.");
}
@@ -235,12 +316,36 @@ public class TestHDDSUpgrade {
Assert.assertTrue(countContainers >= 1);
}
- private void testDataNodesStateOnSCM(NodeState state) {
+ /*
+ * Helper function to test that we can create new pipelines Post-Upgrade.
+ */
+ private void testPostUpgradePipelineCreation() throws IOException {
+ Pipeline ratisPipeline1 = scmPipelineManager.createPipeline(RATIS, THREE);
+ scmPipelineManager.openPipeline(ratisPipeline1.getId());
+ Assert.assertEquals(0,
+ scmPipelineManager.getNumberOfContainers(ratisPipeline1.getId()));
+ PipelineID pid = scmContainerManager.allocateContainer(RATIS, THREE,
+ "Owner1").getPipelineID();
+ Assert.assertEquals(1, scmPipelineManager.getNumberOfContainers(pid));
+ Assert.assertEquals(pid, ratisPipeline1.getId());
+ }
+
+ /*
+ * Helper function to test DataNode state on the SCM. Note that due to
+ * timing constraints, sometime the node-state can transition to the next
+ * state. This function expects the DataNode to be in NodeState "state" or
+ * "alternateState". Some tests can enforce a unique NodeState test by
+ * setting "alternateState = null".
+ */
+ private void testDataNodesStateOnSCM(NodeState state,
+ NodeState alternateState) {
int countNodes = 0;
- for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()){
+ for (DatanodeDetails dn : scm.getScmNodeManager().getAllNodes()) {
try {
- Assert.assertEquals(state,
- scm.getScmNodeManager().getNodeStatus(dn).getHealth());
+ NodeState dnState =
+ scm.getScmNodeManager().getNodeStatus(dn).getHealth();
+ Assert.assertTrue((dnState == state) ||
+ (alternateState == null ? false : dnState == alternateState));
} catch (NodeNotFoundException e) {
e.printStackTrace();
Assert.fail("Node not found");
@@ -250,31 +355,43 @@ public class TestHDDSUpgrade {
Assert.assertEquals(NUM_DATA_NODES, countNodes);
}
+ /*
+ * Helper function to wait for Pipeline creation.
+ */
private void waitForPipelineCreated() throws Exception {
- LambdaTestUtils.await(10000, 2000, () -> {
+ LambdaTestUtils.await(10000, 500, () -> {
List<Pipeline> pipelines =
scmPipelineManager.getPipelines(RATIS, THREE, OPEN);
return pipelines.size() == 1;
});
}
- @Test
- public void testFinalizationFromInitialVersionToLatestVersion()
- throws Exception {
-
- waitForPipelineCreated();
-
- // we will create CONTAINERS_CREATED_FOR_TESTING number of containers.
+ /*
+ * Helper function for container creation.
+ */
+ private void createTestContainers() throws IOException {
XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
ContainerInfo ci1 = scmContainerManager.allocateContainer(
RATIS, THREE, "Owner1");
- ratisPipeline1 = scmPipelineManager.getPipeline(ci1.getPipelineID());
+ Pipeline ratisPipeline1 =
+ scmPipelineManager.getPipeline(ci1.getPipelineID());
scmPipelineManager.openPipeline(ratisPipeline1.getId());
XceiverClientSpi client1 =
xceiverClientManager.acquireClient(ratisPipeline1);
ContainerProtocolCalls.createContainer(client1,
ci1.getContainerID(), null);
xceiverClientManager.releaseClient(client1, false);
+ }
+
+ /*
+ * Happy Path Test Case.
+ */
+ @Test
+ public void testFinalizationFromInitialVersionToLatestVersion()
+ throws Exception {
+
+ waitForPipelineCreated();
+ createTestContainers();
// Test the Pre-Upgrade conditions on SCM as well as DataNodes.
testPreUpgradeConditionsSCM();
@@ -312,6 +429,9 @@ public class TestHDDSUpgrade {
// Verify Post-Upgrade conditions on the SCM.
testPostUpgradeConditionsSCM();
+ // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
+ testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+
// Verify the SCM has driven all the DataNodes through Layout Upgrade.
testPostUpgradeConditionsDataNodes();
@@ -322,5 +442,624 @@ public class TestHDDSUpgrade {
store.getVolume("vol1").createBucket("buc1");
store.getVolume("vol1").getBucket("buc1").createKey("key1", 100,
ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
+
+ }
+
+ /*
+ * All the subsequent tests here are failure cases. Some of the tests below
+ * could simultaneously fail one or more nodes at specific execution points
+ * and in different thread contexts.
+ * Upgrade path key execution points are defined in
+ * UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+
+ /*
+ * Helper function to inject SCM failure and a SCM restart at a given
+ * execution point during SCM-Upgrade.
+ *
+ * Injects Failure in : SCM
+ * Executing-Thread-Context : SCM-Upgrade
+ */
+ private Boolean injectSCMFailureDuringSCMUpgrade()
+ throws InterruptedException, TimeoutException, AuthenticationException,
+ IOException {
+ // For some tests this could get called in a different thread context.
+ // We need to guard concurrent updates to the cluster.
+ synchronized(cluster) {
+ cluster.restartStorageContainerManager(true);
+ loadSCMState();
+ }
+ // The ongoing current SCM Upgrade is getting aborted at this point. We
+ // need to schedule a new SCM Upgrade on a different thread context.
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ loadSCMState();
+ scm.finalizeUpgrade("xyz");
+ } catch (IOException e) {
+ e.printStackTrace();
+ testPassed.set(false);
+ }
+ }
+ });
+ t.start();
+ return true;
+ }
+
+ /*
+ * Helper function to inject DataNode failures and DataNode restarts at a
+ * given execution point during SCM-Upgrade. Please note that it fails all
+ * the DataNodes in the cluster and is part of test cases that simulate
+ * multi-node failure at specific code-execution points during SCM Upgrade.
+ * Please note that this helper function should be called in the thread
+ * context of an SCM-Upgrade only. The return value has a significance that
+ * it does not abort the currently ongoing SCM upgrade. because this
+ * failure injection does not fail the SCM node and only impacts datanodes,
+ * we do not need to schedule another scm-finalize-upgrade here.
+ *
+ * Injects Failure in : All the DataNodes
+ * Executing-Thread-Context : SCM-Upgrade
+ */
+ private Boolean injectDataNodeFailureDuringSCMUpgrade() {
+ try {
+ // Work on a Copy of current set of DataNodes to avoid
+ // running into tricky situations.
+ List<HddsDatanodeService> currentDataNodes =
+ new ArrayList<>(cluster.getHddsDatanodes());
+ for (HddsDatanodeService ds: currentDataNodes) {
+ DatanodeDetails dn = ds.getDatanodeDetails();
+ cluster.restartHddsDatanode(dn, false);
+ }
+ cluster.waitForClusterToBeReady();
+ } catch (Exception e) {
+ LOG.info("DataNode Restarts Failed!");
+ testPassed.set(false);
+ }
+ loadSCMState();
+ // returning false from injection function, continues currently ongoing
+ // SCM-Upgrade-Finalization.
+ return false;
+ }
+
+ /*
+ * Helper function to inject a DataNode failure and restart for a specific
+ * DataNode. This injection function can target a specific DataNode and
+ * thus facilitates getting called in the upgrade-finalization thread context
+ * of that specific DataNode.
+ *
+ * Injects Failure in : Given DataNodes
+ * Executing-Thread-Context : the same DataNode that we are failing here.
+ */
+ private Thread injectDataNodeFailureDuringDataNodeUpgrade(
+ DatanodeDetails dn) {
+ Thread t = null;
+ try {
+ // Schedule the DataNode restart on a separate thread context
+ // otherwise DataNode restart will hang. Also any cluster modification
+ // needs to be guarded since it could get modified in multiple
independent
+ // threads.
+ t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ synchronized (cluster) {
+ cluster.restartHddsDatanode(dn, true);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ testPassed.set(false);
+ }
+ }
+ });
+ } catch (Exception e) {
+ LOG.info("DataNode Restart Failed!");
+ Assert.fail(e.getMessage());
+ }
+ return t;
+ }
+
+ /*
+ * Helper function to inject coordinated failures and restarts across
+ * all the DataNode as well as SCM. This can help create targeted test cases
+ * to inject such comprehensive failures in SCM-Upgrade-Context as well as
+ * DataNode-Upgrade-Context.
+ *
+ * Injects Failure in : SCM as well as ALL the DataNodes.
+ * Executing-Thread-Context : Either the SCM-Upgrade-Finalizer or the
+ * DataNode-Upgrade-Finalizer.
+ */
+ private Thread injectSCMAndDataNodeFailureTogetherAtTheSameTime()
+ throws InterruptedException, TimeoutException, AuthenticationException,
+ IOException {
+ // This needs to happen in a separate thread context otherwise
+ // DataNode restart will hang.
+ return new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Since we are modifying cluster in an independent thread context,
+ // we synchronize access to it to avoid concurrent modification
+ // exception.
+ synchronized (cluster) {
+ // Work on a Copy of current set of DataNodes to avoid
+ // running into tricky situations.
+ List<HddsDatanodeService> currentDataNodes =
+ new ArrayList<>(cluster.getHddsDatanodes());
+ for (HddsDatanodeService ds: currentDataNodes) {
+ DatanodeDetails dn = ds.getDatanodeDetails();
+ cluster.restartHddsDatanode(dn, false);
+ }
+ cluster.restartStorageContainerManager(false);
+ cluster.waitForClusterToBeReady();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ testPassed.set(false);
+ }
+ }
+ });
+ }
+
+ /*
+ * We have various test cases to target single-node or multi-node failures
+ * below.
+ **/
+
+ /*
+ * One node(SCM) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test SCM failure During SCM Upgrade before execution point
+ * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testScmFailuresBeforeScmPreFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ BEFORE_PRE_FINALIZE_UPGRADE,
+ () -> {
+ return this.injectSCMFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * One node(SCM) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test SCM failure During SCM Upgrade after execution point
+ * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testScmFailuresAfterScmPreFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_PRE_FINALIZE_UPGRADE,
+ () -> {
+ return this.injectSCMFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * One node(SCM) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test SCM failure During SCM Upgrade after execution point
+ * "CompleteFinalization". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testScmFailuresAfterScmCompleteFinalization()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_COMPLETE_FINALIZATION,
+ () -> {
+ return this.injectSCMFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * One node(SCM) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test SCM failure During SCM Upgrade after execution point
+ * "PostFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testScmFailuresAfterScmPostFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_POST_FINALIZE_UPGRADE,
+ () -> {
+ return this.injectSCMFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * Multi node(all DataNodes) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test all DataNode failures During SCM Upgrade before execution point
+ * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testAllDataNodeFailuresBeforeScmPreFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ BEFORE_PRE_FINALIZE_UPGRADE,
+ () -> {
+ return injectDataNodeFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * Multi node(all DataNodes) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test all DataNode failures During SCM Upgrade before execution point
+ * "PreFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testAllDataNodeFailuresAfterScmPreFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_PRE_FINALIZE_UPGRADE,
+ () -> {
+ return injectDataNodeFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * Multi node(all DataNodes) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test all DataNode failures During SCM Upgrade after execution point
+ * "CompleteFinalization". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testAllDataNodeFailuresAfterScmCompleteFinalization()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_COMPLETE_FINALIZATION,
+ () -> {
+ return injectDataNodeFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * Multi node(all DataNodes) failure case:
+ * Thread-Context : SCM-Upgrade
+ *
+ * Test all DataNode failures During SCM Upgrade after execution point
+ * "PostFinalizeUpgrade". All meaningful Upgrade execution points
+ * are defined in UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Test
+ public void testAllDataNodeFailuresAfterScmPostFinalizeUpgrade()
+ throws Exception {
+ testPassed.set(true);
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ AFTER_POST_FINALIZE_UPGRADE,
+ () -> {
+ return injectDataNodeFailureDuringSCMUpgrade();
+ });
+ testFinalizationWithFailuerInjectionHelper(null);
+ Assert.assertTrue(testPassed.get());
+ }
+
+ /*
+ * Single node(targeted DataNode) failure case:
+ * Thread-Context : DataNode-Upgrade.
+ *
+ * Fail the same DataNode that is going through Upgrade-processing at a
+ * specific code execution point. This test covers all the meaningful
+ * Upgrade execution points as defined in
+ * UpgradeFinalizer:UpgradeTestInjectionPoints.
+ */
+ @Ignore
+ @Test
+ public void testDataNodeFailuresDuringDataNodeUpgrade()
+ throws Exception {
+ for (UpgradeTestInjectionPoints injectionPoint:
+ UpgradeTestInjectionPoints.values()) {
+ testPassed.set(true);
+ // Configure a given data node to fail itself when it's
+ // corresponding Upgrade-Finalizer reaches a specific point in it's
+ // execution.
+ HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+ Thread failureInjectionThread =
+ injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails());
+ InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ dataNodeFinalizationExecutor.configureTestInjectionFunction(
+ injectionPoint, () -> {
+ failureInjectionThread.start();
+ return true;
+ });
+ ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+ .getUpgradeFinalizer())
+ .setFinalizationExecutor(dataNodeFinalizationExecutor);
+ testFinalizationWithFailuerInjectionHelper(failureInjectionThread);
+ Assert.assertTrue(testPassed.get());
+ synchronized (cluster) {
+ shutdown();
+ init();
+ }
+ LOG.info("testDataNodeFailuresDuringDataNodeUpgrade: Failure Injection "
+
+ "Point {} passed.", injectionPoint.name());
+ }
+ }
+
+ /*
+ * Two nodes(SCM and a targeted DataNode) combination failure case:
+ * Thread-Contexts :
+ * DataNode failure in its own DataNode-Upgrade-Context .
+ * SCM failure in its own SCM-Upgrade-Context .
+ *
+ * Fail the same DataNode that is going through its own Upgrade-processing
+ * at a specific code execution point. Also fail the SCM when SCM is going
+ * through upgrade-finalization. This test covers all the combinations of
+ * SCM-Upgrade-execution points and DataNode-Upgrade-execution points.
+ */
+ @Ignore
+ @Test
+ public void testAllPossibleDataNodeFailuresAndSCMFailures()
+ throws Exception {
+ // Configure a given data node to restart itself when it's
+ // corresponding Upgrade-Finalizer reaches a specific point in it's
+ // execution.
+ for (UpgradeTestInjectionPoints scmInjectionPoint :
+ UpgradeTestInjectionPoints.values()) {
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ scmInjectionPoint,
+ () -> {
+ return this.injectSCMFailureDuringSCMUpgrade();
+ });
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+
+ for (UpgradeTestInjectionPoints datanodeInjectionPoint :
+ UpgradeTestInjectionPoints.values()) {
+ HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+ testPassed.set(true);
+ Thread dataNodefailureInjectionThread =
+
injectDataNodeFailureDuringDataNodeUpgrade(ds.getDatanodeDetails());
+ InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ dataNodeFinalizationExecutor.configureTestInjectionFunction(
+ datanodeInjectionPoint, () -> {
+ dataNodefailureInjectionThread.start();
+ return true;
+ });
+ ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+ .getUpgradeFinalizer())
+ .setFinalizationExecutor(dataNodeFinalizationExecutor);
+ testFinalizationWithFailuerInjectionHelper(
+ dataNodefailureInjectionThread);
+ Assert.assertTrue(testPassed.get());
+ synchronized (cluster) {
+ shutdown();
+ init();
+ }
+ LOG.info("testAllPossibleDataNodeFailuresAndSCMFailures: " +
+ "DataNode-Failure-Injection-Point={} with " +
+ "Scm-FailureInjection-Point={} passed.",
+ datanodeInjectionPoint.name(), scmInjectionPoint.name());
+ }
+ }
+ }
+
+ /*
+ * Two nodes(SCM and a targeted DataNode together at the same time)
+ * combination failure case:
+ * Thread-Contexts :
+ * SCM-Upgrade-Finalizer-Context
+ *
+ * Fail the DataNode and the SCM together when the SCM is going
+ * through upgrade. This test covers all the combinations of
+ * SCM-Upgrade-execution points.
+ */
+ @Ignore
+ @Test
+ public void testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade()
+ throws Exception {
+ for (UpgradeTestInjectionPoints injectionPoint :
+ UpgradeTestInjectionPoints.values()) {
+ testPassed.set(true);
+ Thread helpingFailureInjectionThread =
+ injectSCMAndDataNodeFailureTogetherAtTheSameTime();
+ InjectedUpgradeFinalizationExecutor scmFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ scmFinalizationExecutor.configureTestInjectionFunction(
+ injectionPoint, () -> {
+ helpingFailureInjectionThread.start();
+ return true;
+ });
+ ((BasicUpgradeFinalizer)scm.getUpgradeFinalizer())
+ .setFinalizationExecutor(scmFinalizationExecutor);
+
testFinalizationWithFailuerInjectionHelper(helpingFailureInjectionThread);
+ Assert.assertTrue(testPassed.get());
+ synchronized (cluster) {
+ shutdown();
+ init();
+ }
+ LOG.info("testDataNodeAndSCMFailuresTogetherDuringSCMUpgrade: Failure " +
+ "Injection Point {} passed.", injectionPoint.name());
+ }
+ }
+
+ /*
+ * Two nodes(SCM and a targeted DataNode together at the same time)
+ * combination failure case:
+ * Thread-Contexts :
+ * DataNode-Upgrade-Finalizer-Context.
+ *
+ * Fail the DataNode and the SCM together when the DataNode is going
+ * through upgrade. This test covers all the combinations of
+ * DataNode-Upgrade-execution points.
+ */
+ @Ignore
+ @Test
+ public void testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade()
+ throws Exception {
+ for (UpgradeTestInjectionPoints injectionPoint :
+ UpgradeTestInjectionPoints.values()) {
+ testPassed.set(true);
+ Thread helpingFailureInjectionThread =
+ injectSCMAndDataNodeFailureTogetherAtTheSameTime();
+ HddsDatanodeService ds = cluster.getHddsDatanodes().get(1);
+ InjectedUpgradeFinalizationExecutor dataNodeFinalizationExecutor =
+ new InjectedUpgradeFinalizationExecutor();
+ dataNodeFinalizationExecutor.configureTestInjectionFunction(
+ injectionPoint, () -> {
+ helpingFailureInjectionThread.start();
+ return true;
+ });
+ ((BasicUpgradeFinalizer)ds.getDatanodeStateMachine()
+ .getUpgradeFinalizer())
+ .setFinalizationExecutor(dataNodeFinalizationExecutor);
+
testFinalizationWithFailuerInjectionHelper(helpingFailureInjectionThread);
+ Assert.assertTrue(testPassed.get());
+ synchronized (cluster) {
+ shutdown();
+ init();
+ }
+ LOG.info("testDataNodeAndSCMFailuresTogetherDuringDataNodeUpgrade: " +
+ "Failure Injection Point {} passed.", injectionPoint.name());
+ }
+ }
+
+ public void testFinalizationWithFailuerInjectionHelper(
+ Thread failureInjectionThread) throws Exception {
+
+ waitForPipelineCreated();
+ createTestContainers();
+ createKey();
+
+ // Test the Pre-Upgrade conditions on SCM as well as DataNodes.
+ testPreUpgradeConditionsSCM();
+ testPreUpgradeConditionsDataNodes();
+
+ // Trigger Finalization on the SCM
+ StatusAndMessages status = scm.finalizeUpgrade("xyz");
+ Assert.assertEquals(STARTING_FINALIZATION, status.status());
+
+ // Make sure that any outstanding thread created by failure injection
+ // has completed its job.
+ if (failureInjectionThread != null) {
+ failureInjectionThread.join();
+ }
+
+ // Wait for the Finalization to complete on the SCM.
+ // Failure injection could have restarted the SCM and it could be in
+ // ALREADY_FINALIZED state as well.
+ while ((status.status() != FINALIZATION_DONE) &&
+ (status.status() != ALREADY_FINALIZED)) {
+ loadSCMState();
+ status = scm.queryUpgradeFinalizationProgress("xyz", true);
+ if (status.status() == FINALIZATION_REQUIRED) {
+ status = scm.finalizeUpgrade("xyz");
+ }
+ }
+
+ // Verify Post-Upgrade conditions on the SCM.
+ // With failure injection
+ testPostUpgradeConditionsSCM();
+
+ // All datanodes on the SCM should have moved to HEALTHY-READONLY state.
+ // Due to timing constraint also allow a "HEALTHY" state.
+ loadSCMState();
+ testDataNodesStateOnSCM(HEALTHY_READONLY, HEALTHY);
+
+ // Need to wait for post finalization heartbeat from DNs.
+ LambdaTestUtils.await(600000, 500, () -> {
+ try {
+ loadSCMState();
+ testDataNodesStateOnSCM(HEALTHY, null);
+ sleep(100);
+ } catch (Throwable ex) {
+ LOG.info(ex.getMessage());
+ return false;
+ }
+ return true;
+ });
+
+ // Verify the SCM has driven all the DataNodes through Layout Upgrade.
+ testPostUpgradeConditionsDataNodes();
+
+ // Verify that new pipeline can be created with upgraded datanodes.
+ try {
+ testPostUpgradePipelineCreation();
+ } catch(SCMException e) {
+ // If pipeline creation fails, make sure that there is a valid reason
+ // for this i.e. all datanodes are already part of some pipeline.
+ for (HddsDatanodeService dataNode : cluster.getHddsDatanodes()) {
+ DatanodeStateMachine dsm = dataNode.getDatanodeStateMachine();
+ Set<PipelineID> pipelines =
+ scm.getScmNodeManager().getPipelines(dsm.getDatanodeDetails());
+ Assert.assertTrue(pipelines != null);
+ }
+ }
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
index 598b046..2b2ef7e 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -20,21 +20,18 @@ package org.apache.hadoop.ozone.om.upgrade;
import static
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUT_VERSION_KEY;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import java.io.IOException;
import java.util.Optional;
-import java.util.concurrent.Callable;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
/**
* UpgradeFinalizer implementation for the Ozone Manager service.
@@ -42,6 +39,7 @@ import
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
public class OMUpgradeFinalizer extends BasicUpgradeFinalizer<OzoneManager,
OMLayoutVersionManager> {
private static final OmUpgradeAction NOOP = a -> {};
+ private OzoneManager ozoneManager;
public OMUpgradeFinalizer(OMLayoutVersionManager versionManager) {
super(versionManager);
@@ -50,6 +48,7 @@ public class OMUpgradeFinalizer extends
BasicUpgradeFinalizer<OzoneManager,
@Override
public StatusAndMessages finalize(String upgradeClientID, OzoneManager om)
throws IOException {
+ ozoneManager = om;
StatusAndMessages response = preFinalize(upgradeClientID, om);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
@@ -64,64 +63,36 @@ public class OMUpgradeFinalizer extends
BasicUpgradeFinalizer<OzoneManager,
// ExecutorService executor =
// Executors.newSingleThreadExecutor(r -> new Thread(threadName));
// executor.submit(new Worker(om));
- new Worker(om).call();
+ try {
+ getFinalizationExecutor().execute(ozoneManager.getOmStorage(),
+ this);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw (IOException) e;
+ }
return STARTING_MSG;
}
- /**
- * This class implements the finalization logic applied to every
- * LayoutFeature that needs to be finalized.
- *
- * For the first approach this happens synchronously within the state machine
- * during the FinalizeUpgrade request, but ideally this has to be moved to
- * individual calls that are going into the StateMaching one by one.
- * The prerequisits for this to happen in the background are the following:
- * - more fine grained control for LayoutFeatures to prepare the
- * finalization outside the state machine, do the switch from old to new
- * logic inside the statemachine and apply the finalization, and then do
- * any cleanup necessary outside the state machine
- * - a way to post a request to the state machine that is not part of the
- * client API, so basically not an OMRequest, but preferably an internal
- * request, which is posted from the leader OM to the follower OMs only.
- * - ensure that there is a possibility to implement a rollback logic if
- * something goes wrong inside the state machine, to avoid OM stuck in an
- * intermediate state due to an error.
- */
- private class Worker implements Callable<Void> {
- private OzoneManager ozoneManager;
+ @Override
+ protected void postFinalizeUpgrade() throws IOException {
+ return;
+ }
- /**
- * Initiates the Worker, for the specified OM instance.
- * @param om the OzoneManager instance on which to finalize the new
- * LayoutFeatures.
- */
- Worker(OzoneManager om) {
- ozoneManager = om;
+ @Override
+ protected void finalizeUpgrade(Storage storageConfig)
+ throws UpgradeException {
+ for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
+ Optional<? extends UpgradeAction> action = f.action(ON_FINALIZE);
+ finalizeFeature(f, storageConfig, action);
+ updateLayoutVersionInVersionFile(f, storageConfig);
+ versionManager.finalized(f);
}
+ versionManager.completeFinalization();
+ }
- @Override
- public Void call() throws IOException {
- try {
- emitStartingMsg();
- versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
-
- OMStorage omStorage = ozoneManager.getOmStorage();
-
- for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
- Optional<? extends UpgradeAction> action = f.action(ON_FINALIZE);
- finalizeFeature(f, omStorage, action);
- updateLayoutVersionInVersionFile(f, ozoneManager.getOmStorage());
- versionManager.finalized(f);
- }
-
- versionManager.completeFinalization();
- emitFinishedMsg();
- return null;
- } finally {
- versionManager.setUpgradeState(FINALIZATION_DONE);
- isDone = true;
- }
- }
+ @Override
+ protected boolean preFinalizeUpgrade() throws IOException {
+ return true;
}
public void runPrefinalizeStateActions(Storage storage, OzoneManager om)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]