This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
new c3e3b35 HDDS-4179. Implement post-finalize SCM logic. (#1611)
c3e3b35 is described below
commit c3e3b3541025c3107ae77287466e4082ad2cd11f
Author: prashantpogde <[email protected]>
AuthorDate: Thu Dec 3 09:48:58 2020 -0800
HDDS-4179. Implement post-finalize SCM logic. (#1611)
---
.../hdds/upgrade/HDDSLayoutFeatureCatalog.java | 3 +-
.../hadoop/hdds/upgrade/HDDSUpgradeAction.java | 6 +-
.../upgrade/AbstractLayoutVersionManager.java | 9 +-
.../ozone/upgrade/BasicUpgradeFinalizer.java | 305 +++++++++++++++++++++
.../hadoop/ozone/upgrade/UpgradeException.java | 116 ++++++++
.../hadoop/ozone/upgrade/UpgradeFinalizer.java | 10 +
.../apache/hadoop/hdds/scm/node/NodeManager.java | 2 +
.../hadoop/hdds/scm/node/NodeStateManager.java | 69 +++--
.../hadoop/hdds/scm/node/SCMNodeManager.java | 6 +
.../scm/pipeline/BackgroundPipelineCreator.java | 14 +
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 4 +
.../hdds/scm/pipeline/SCMPipelineManager.java | 25 ++
.../hdds/scm/server/StorageContainerManager.java | 108 ++++++++
.../server/upgrade/NewSCMFeatureUpgradeAction.java | 34 ---
.../hdds/scm/server/upgrade/SCMUpgradeAction.java | 4 +-
.../scm/server/upgrade/SCMUpgradeFinalizer.java | 96 +++++++
.../ozone/om/upgrade/OMUpgradeFinalizer.java | 253 +++--------------
.../ozone/om/upgrade/TestOMUpgradeFinalizer.java | 27 +-
18 files changed, 802 insertions(+), 289 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
index 69832bd..5ef86af 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutFeatureCatalog.java
@@ -39,7 +39,8 @@ public class HDDSLayoutFeatureCatalog {
private int layoutVersion;
private String description;
- private Optional<HDDSUpgradeAction> hddsUpgradeAction = Optional.empty();
+ private Optional< ? extends HDDSUpgradeAction> hddsUpgradeAction =
+ Optional.empty();
HDDSLayoutFeature(final int layoutVersion, String description) {
this.layoutVersion = layoutVersion;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
index 0808b0f..68ab666 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSUpgradeAction.java
@@ -23,5 +23,9 @@ import
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
/**
* Upgrade Action for SCM and DataNodes.
*/
-public interface HDDSUpgradeAction<T> extends UpgradeAction<T> {
+public class HDDSUpgradeAction<T> implements UpgradeAction<T> {
+ @Override
+ public void executeAction(T arg) throws Exception {
+
+ }
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 831e6a1..c3ecb54 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status;
/**
* Layout Version Manager containing generic method implementations.
@@ -44,7 +45,7 @@ public abstract class AbstractLayoutVersionManager<T extends
LayoutFeature>
protected TreeMap<Integer, T> features = new TreeMap<>();
protected Map<String, T> featureMap = new HashMap<>();
protected volatile boolean isInitialized = false;
- protected volatile UpgradeFinalizer.Status currentUpgradeState =
+ protected volatile Status currentUpgradeState =
FINALIZATION_REQUIRED;
protected void init(int version, T[] lfs) throws IOException {
@@ -68,10 +69,14 @@ public abstract class AbstractLayoutVersionManager<T
extends LayoutFeature>
"AbstractLayoutVersionManager", this);
}
- public UpgradeFinalizer.Status getUpgradeState() {
+ public Status getUpgradeState() {
return currentUpgradeState;
}
+ public void setUpgradeState(Status status) {
+ currentUpgradeState = status;
+ }
+
private void initializeFeatures(T[] lfs) {
Arrays.stream(lfs).forEach(f -> {
Preconditions.checkArgument(!featureMap.containsKey(f.name()));
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
new file mode 100644
index 0000000..3099cef
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/BasicUpgradeFinalizer.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.INVALID_REQUEST;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
+import org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes;
+
+/**
+ * UpgradeFinalizer implementation for the Storage Container Manager service.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class BasicUpgradeFinalizer<T, V extends AbstractLayoutVersionManager>
+ implements UpgradeFinalizer<T> {
+
+ protected V versionManager;
+ protected String clientID;
+ protected T component;
+
+ private Queue<String> msgs = new ConcurrentLinkedQueue<>();
+ protected boolean isDone = false;
+
+ public BasicUpgradeFinalizer(V versionManager) {
+ this.versionManager = versionManager;
+ }
+
+ public boolean isFinalizationDone() {
+ return isDone;
+ }
+
+ public synchronized StatusAndMessages preFinalize(String upgradeClientID,
+ T id)
+ throws UpgradeException {
+ switch (versionManager.getUpgradeState()) {
+ case STARTING_FINALIZATION:
+ return STARTING_MSG;
+ case FINALIZATION_IN_PROGRESS:
+ return FINALIZATION_IN_PROGRESS_MSG;
+ case FINALIZATION_DONE:
+ case ALREADY_FINALIZED:
+ return FINALIZED_MSG;
+ default:
+ if (!versionManager.needsFinalization()) {
+ throw new UpgradeException("Upgrade found in inconsistent state. " +
+ "Upgrade state is FINALIZATION_REQUIRED while MLV has been " +
+ "upgraded to SLV.", INVALID_REQUEST);
+ }
+ versionManager.setUpgradeState(STARTING_FINALIZATION);
+
+ clientID = upgradeClientID;
+ this.component = id;
+ return FINALIZATION_REQUIRED_MSG;
+ }
+ }
+
+ /*
+ * This method must be overriden by the component implementing the
+ * finalization logic.
+ */
+ public StatusAndMessages finalize(String upgradeClientID, T id)
+ throws IOException {
+ StatusAndMessages response = preFinalize(upgradeClientID, id);
+ if (response.status() != FINALIZATION_REQUIRED) {
+ return response;
+ }
+
+ /**
+ * Overriding class should schedule actual finalization logic
+ * in a separate thread here.
+ */
+ return STARTING_MSG;
+ }
+
+ @Override
+ public synchronized StatusAndMessages reportStatus(
+ String upgradeClientID, boolean takeover
+ ) throws UpgradeException {
+ if (takeover) {
+ clientID = upgradeClientID;
+ }
+ assertClientId(upgradeClientID);
+ List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
+ Status status = versionManager.getUpgradeState();
+ while (msgs.size() > 0) {
+ returningMsgs.add(msgs.poll());
+ }
+ return new StatusAndMessages(status, returningMsgs);
+ }
+
+ private void assertClientId(String id) throws UpgradeException {
+ if (!this.clientID.equals(id)) {
+ throw new UpgradeException("Unknown client tries to get finalization " +
+ "status.\n The requestor is not the initiating client of the " +
+ "finalization, if you want to take over, and get unsent status " +
+ "messages, check -takeover option.", INVALID_REQUEST);
+ }
+ }
+
+ protected void finalizeFeature(LayoutFeature feature, Storage config)
+ throws UpgradeException {
+ Optional<? extends UpgradeAction> action = feature.onFinalizeAction();
+
+ if (!action.isPresent()) {
+ emitNOOPMsg(feature.name());
+ return;
+ }
+
+ putFinalizationMarkIntoVersionFile(feature, config);
+
+ emitStartingFinalizationActionMsg(feature.name());
+ try {
+ UpgradeAction<T> newaction = action.get();
+ newaction.executeAction(component);
+ } catch (Exception e) {
+ logFinalizationFailureAndThrow(e, feature.name());
+ }
+ emitFinishFinalizationActionMsg(feature.name());
+
+ removeFinalizationMarkFromVersionFile(feature, config);
+ }
+
+ protected void updateLayoutVersionInVersionFile(LayoutFeature feature,
+ Storage config)
+ throws UpgradeException {
+ int prevLayoutVersion = currentStoredLayoutVersion(config);
+
+ updateStorageLayoutVersion(feature.layoutVersion(), config);
+ try {
+ persistStorage(config);
+ } catch (IOException e) {
+ updateStorageLayoutVersion(prevLayoutVersion, config);
+ logLayoutVersionUpdateFailureAndThrow(e);
+ }
+ }
+
+ protected void putFinalizationMarkIntoVersionFile(LayoutFeature feature,
+ Storage config)
+ throws UpgradeException {
+ try {
+ emitUpgradeToLayoutVersionPersistingMsg(feature.name());
+
+ setUpgradeToLayoutVersionInStorage(feature.layoutVersion(), config);
+ persistStorage(config);
+
+ emitUpgradeToLayoutVersionPersistedMsg();
+ } catch (IOException e) {
+ logUpgradeToLayoutVersionPersistingFailureAndThrow(feature.name(), e);
+ }
+ }
+
+ protected void removeFinalizationMarkFromVersionFile(
+ LayoutFeature feature, Storage config) throws UpgradeException {
+ try {
+ emitRemovingUpgradeToLayoutVersionMsg(feature.name());
+
+ unsetUpgradeToLayoutVersionInStorage(config);
+ persistStorage(config);
+
+ emitRemovedUpgradeToLayoutVersionMsg();
+ } catch (IOException e) {
+ logUpgradeToLayoutVersionRemovalFailureAndThrow(feature.name(), e);
+ }
+ }
+
+ private void setUpgradeToLayoutVersionInStorage(int version,
+ Storage config) {
+ config.setUpgradeToLayoutVersion(version);
+ }
+
+ private void unsetUpgradeToLayoutVersionInStorage(Storage config) {
+ config.unsetUpgradeToLayoutVersion();
+ }
+
+ private int currentStoredLayoutVersion(Storage config) {
+ return config.getLayoutVersion();
+ }
+
+ private void updateStorageLayoutVersion(int version, Storage config) {
+ config.setLayoutVersion(version);
+ }
+
+ private void persistStorage(Storage config) throws IOException {
+ config.persistCurrentState();
+ }
+
+ protected void emitNOOPMsg(String feature) {
+ String msg = "No finalization work defined for feature: " + feature + ".";
+ String msg2 = "Skipping.";
+
+ logAndEmit(msg);
+ logAndEmit(msg2);
+ }
+
+ protected void emitStartingMsg() {
+ String msg = "Finalization started.";
+ logAndEmit(msg);
+ }
+
+ protected void emitFinishedMsg() {
+ String msg = "Finalization is done.";
+ logAndEmit(msg);
+ }
+
+ protected void emitStartingFinalizationActionMsg(String feature) {
+ String msg = "Executing finalization of feature: " + feature + ".";
+ logAndEmit(msg);
+ }
+
+ protected void emitFinishFinalizationActionMsg(String feature) {
+ String msg = "The feature " + feature + " is finalized.";
+ logAndEmit(msg);
+ }
+
+ private void emitUpgradeToLayoutVersionPersistingMsg(String feature) {
+ String msg = "Mark finalization of " + feature + " in VERSION file.";
+ logAndEmit(msg);
+ }
+
+ private void emitUpgradeToLayoutVersionPersistedMsg() {
+ String msg = "Finalization mark placed.";
+ logAndEmit(msg);
+ }
+
+ private void emitRemovingUpgradeToLayoutVersionMsg(String feature) {
+ String msg = "Remove finalization mark of " + feature
+ + " feature from VERSION file.";
+ logAndEmit(msg);
+ }
+
+ private void emitRemovedUpgradeToLayoutVersionMsg() {
+ String msg = "Finalization mark removed.";
+ logAndEmit(msg);
+ }
+
+ private void logAndEmit(String msg) {
+ LOG.info(msg);
+ msgs.offer(msg);
+ }
+
+ protected void logFinalizationFailureAndThrow(Exception e, String feature)
+ throws UpgradeException {
+ String msg = "Error during finalization of " + feature + ".";
+ logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
+ }
+
+ private void logLayoutVersionUpdateFailureAndThrow(IOException e)
+ throws UpgradeException {
+ String msg = "Updating the LayoutVersion in the VERSION file failed.";
+ logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
+ }
+
+ private void logUpgradeToLayoutVersionPersistingFailureAndThrow(
+ String feature, IOException e
+ ) throws UpgradeException {
+ String msg = "Failed to update VERSION file with the upgrading feature: "
+ + feature + ".";
+ logAndThrow(e, msg, PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+ }
+
+ private void logUpgradeToLayoutVersionRemovalFailureAndThrow(
+ String feature, IOException e) throws UpgradeException {
+ String msg =
+ "Failed to unmark finalization of " + feature + " LayoutFeature.";
+ logAndThrow(e, msg, REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+ }
+
+ private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
+ throws UpgradeException {
+ LOG.error(msg, e);
+ throw new UpgradeException(msg, e, resultCode);
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
new file mode 100644
index 0000000..7b3a5c0
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeException.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.upgrade;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown when upgrade fails.
+ */
+public class UpgradeException extends IOException {
+
+ public static final String STATUS_CODE = "STATUS_CODE=";
+ private final UpgradeException.ResultCodes result;
+
+ /**
+ * Constructs an {@code IOException} with {@code null}
+ * as its error detail message.
+ */
+ public UpgradeException(UpgradeException.ResultCodes result) {
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ */
+ public UpgradeException(String message, UpgradeException.ResultCodes result)
{
+ super(message);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified detail message
+ * and cause.
+ * <p>
+ * <p> Note that the detail message associated with {@code cause} is
+ * <i>not</i> automatically incorporated into this exception's detail
+ * message.
+ *
+ * @param message The detail message (which is saved for later retrieval by
+ * the
+ * {@link #getMessage()} method)
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and indicates that the
+ * cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public UpgradeException(String message, Throwable cause,
+ UpgradeException.ResultCodes result) {
+ super(message, cause);
+ this.result = result;
+ }
+
+ /**
+ * Constructs an {@code IOException} with the specified cause and a
+ * detail message of {@code (cause==null ? null : cause.toString())}
+ * (which typically contains the class and detail message of {@code cause}).
+ * This constructor is useful for IO exceptions that are little more
+ * than wrappers for other throwables.
+ *
+ * @param cause The cause (which is saved for later retrieval by the {@link
+ * #getCause()} method). (A null value is permitted, and indicates that the
+ * cause is nonexistent or unknown.)
+ * @since 1.6
+ */
+ public UpgradeException(Throwable cause,
+ UpgradeException.ResultCodes result) {
+ super(cause);
+ this.result = result;
+ }
+
+ /**
+ * Returns resultCode.
+ * @return ResultCode
+ */
+ public UpgradeException.ResultCodes getResult() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return result + " " + super.toString();
+ }
+ /**
+ * Error codes to make it easy to decode these exceptions.
+ */
+ public enum ResultCodes {
+
+ OK,
+
+ INVALID_REQUEST,
+
+ PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+ REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+ UPDATE_LAYOUT_VERSION_FAILED,
+ LAYOUT_FEATURE_FINALIZATION_FAILED;
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
index 5568d59..64e83c7 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
@@ -112,6 +112,16 @@ public interface UpgradeFinalizer<T> {
Arrays.asList("Starting Finalization")
);
+ StatusAndMessages FINALIZATION_IN_PROGRESS_MSG = new StatusAndMessages(
+ Status.FINALIZATION_IN_PROGRESS,
+ Arrays.asList("Finalization in progress")
+ );
+
+ StatusAndMessages FINALIZATION_REQUIRED_MSG = new StatusAndMessages(
+ Status.FINALIZATION_REQUIRED,
+ Arrays.asList("Finalization required")
+ );
+
/**
* Default message to provide when the service is in ALREADY_FINALIZED state.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 3952565..48c9e04 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -235,4 +235,6 @@ public interface NodeManager extends
StorageContainerNodeProtocol,
default HDDSLayoutVersionManager getLayoutVersionManager(){
return null;
}
+
+ default void forceNodesToHealthyReadOnly() { }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 9cc403f..a01cca73 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -56,6 +56,13 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONED;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DECOMMISSIONING;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY_READONLY;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -166,7 +173,7 @@ public class NodeStateManager implements Runnable,
Closeable {
this.state2EventMap = new HashMap<>();
initialiseState2EventMap();
Set<NodeState> finalStates = new HashSet<>();
- finalStates.add(NodeState.DECOMMISSIONED);
+ finalStates.add(DECOMMISSIONED);
// All DataNodes should start in HealthyReadOnly state.
this.stateMachine = new StateMachine<>(NodeState.HEALTHY_READONLY,
finalStates);
@@ -194,10 +201,10 @@ public class NodeStateManager implements Runnable,
Closeable {
* Populates state2event map.
*/
private void initialiseState2EventMap() {
- state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE);
- state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE);
+ state2EventMap.put(STALE, SCMEvents.STALE_NODE);
+ state2EventMap.put(DEAD, SCMEvents.DEAD_NODE);
state2EventMap
- .put(NodeState.HEALTHY, SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE);
+ .put(HEALTHY, SCMEvents.READ_ONLY_HEALTHY_TO_HEALTHY_NODE);
state2EventMap
.put(NodeState.HEALTHY_READONLY,
SCMEvents.NON_HEALTHY_TO_READONLY_HEALTHY_NODE);
@@ -289,38 +296,38 @@ public class NodeStateManager implements Runnable,
Closeable {
*/
private void initializeStateMachine() {
stateMachine.addTransition(
- NodeState.HEALTHY_READONLY, NodeState.HEALTHY,
+ HEALTHY_READONLY, HEALTHY,
NodeLifeCycleEvent.LAYOUT_MATCH);
stateMachine.addTransition(
- NodeState.HEALTHY_READONLY, NodeState.STALE,
+ HEALTHY_READONLY, STALE,
NodeLifeCycleEvent.TIMEOUT);
stateMachine.addTransition(
- NodeState.HEALTHY_READONLY, NodeState.DECOMMISSIONING,
+ HEALTHY_READONLY, DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
- NodeState.HEALTHY, NodeState.STALE, NodeLifeCycleEvent.TIMEOUT);
+ HEALTHY, STALE, NodeLifeCycleEvent.TIMEOUT);
stateMachine.addTransition(
- NodeState.HEALTHY, NodeState.HEALTHY_READONLY,
+ HEALTHY, HEALTHY_READONLY,
NodeLifeCycleEvent.LAYOUT_MISMATCH);
stateMachine.addTransition(
- NodeState.STALE, NodeState.DEAD, NodeLifeCycleEvent.TIMEOUT);
+ STALE, DEAD, NodeLifeCycleEvent.TIMEOUT);
stateMachine.addTransition(
- NodeState.STALE, NodeState.HEALTHY_READONLY,
+ STALE, HEALTHY_READONLY,
NodeLifeCycleEvent.RESTORE);
stateMachine.addTransition(
- NodeState.DEAD, NodeState.HEALTHY_READONLY,
+ DEAD, HEALTHY_READONLY,
NodeLifeCycleEvent.RESURRECT);
stateMachine.addTransition(
- NodeState.HEALTHY, NodeState.DECOMMISSIONING,
+ HEALTHY, DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
- NodeState.STALE, NodeState.DECOMMISSIONING,
+ STALE, DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
- NodeState.DEAD, NodeState.DECOMMISSIONING,
+ DEAD, DECOMMISSIONING,
NodeLifeCycleEvent.DECOMMISSION);
stateMachine.addTransition(
- NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED,
+ DECOMMISSIONING, DECOMMISSIONED,
NodeLifeCycleEvent.DECOMMISSIONED);
}
@@ -418,7 +425,7 @@ public class NodeStateManager implements Runnable,
Closeable {
*/
public List<DatanodeInfo> getHealthyNodes() {
List<DatanodeInfo> allHealthyNodes;
- allHealthyNodes = getNodes(NodeState.HEALTHY);
+ allHealthyNodes = getNodes(HEALTHY);
allHealthyNodes.addAll(getNodes(NodeState.HEALTHY_READONLY));
return allHealthyNodes;
}
@@ -429,7 +436,7 @@ public class NodeStateManager implements Runnable,
Closeable {
* @return list of stale nodes
*/
public List<DatanodeInfo> getStaleNodes() {
- return getNodes(NodeState.STALE);
+ return getNodes(STALE);
}
/**
@@ -438,7 +445,7 @@ public class NodeStateManager implements Runnable,
Closeable {
* @return list of dead nodes
*/
public List<DatanodeInfo> getDeadNodes() {
- return getNodes(NodeState.DEAD);
+ return getNodes(DEAD);
}
/**
@@ -500,7 +507,7 @@ public class NodeStateManager implements Runnable,
Closeable {
* @return healthy node count
*/
public int getHealthyNodeCount() {
- return getNodeCount(NodeState.HEALTHY) +
+ return getNodeCount(HEALTHY) +
getNodeCount(NodeState.HEALTHY_READONLY);
}
@@ -510,7 +517,7 @@ public class NodeStateManager implements Runnable,
Closeable {
* @return stale node count
*/
public int getStaleNodeCount() {
- return getNodeCount(NodeState.STALE);
+ return getNodeCount(STALE);
}
/**
@@ -519,7 +526,7 @@ public class NodeStateManager implements Runnable,
Closeable {
* @return dead node count
*/
public int getDeadNodeCount() {
- return getNodeCount(NodeState.DEAD);
+ return getNodeCount(DEAD);
}
/**
@@ -629,6 +636,24 @@ public class NodeStateManager implements Runnable,
Closeable {
scheduleNextHealthCheck();
}
+ public void forceNodesToHealthyReadOnly() {
+ try {
+ List<UUID> nodes = nodeStateMap.getNodes(HEALTHY);
+ for (UUID id : nodes) {
+ DatanodeInfo node = nodeStateMap.getNodeInfo(id);
+ nodeStateMap.updateNodeState(node.getUuid(), HEALTHY,
+ HEALTHY_READONLY);
+ if (state2EventMap.containsKey(HEALTHY_READONLY)) {
+ eventPublisher.fireEvent(state2EventMap.get(HEALTHY_READONLY),
+ node);
+ }
+ }
+ } catch (NodeNotFoundException ex) {
+ LOG.error("Inconsistent NodeStateMap! {}", nodeStateMap);
+ ex.printStackTrace();
+ }
+ }
+
private void checkNodesHealth() {
/*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 86ef122..51c84dc 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -884,4 +884,10 @@ public class SCMNodeManager implements NodeManager {
public HDDSLayoutVersionManager getLayoutVersionManager() {
return scmLayoutVersionManager;
}
+
+ @VisibleForTesting
+ @Override
+ public void forceNodesToHealthyReadOnly() {
+ nodeStateManager.forceNodesToHealthyReadOnly();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index f240293..18e974a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -46,15 +46,25 @@ class BackgroundPipelineCreator {
private final PipelineManager pipelineManager;
private final ConfigurationSource conf;
private ScheduledFuture<?> periodicTask;
+ private AtomicBoolean pausePipelineCreation;
BackgroundPipelineCreator(PipelineManager pipelineManager,
Scheduler scheduler, ConfigurationSource conf) {
this.pipelineManager = pipelineManager;
this.conf = conf;
this.scheduler = scheduler;
+ this.pausePipelineCreation = new AtomicBoolean(false);
isPipelineCreatorRunning = new AtomicBoolean(false);
}
+ public void pause() {
+ pausePipelineCreation.set(true);
+ }
+
+ public void resume() {
+ pausePipelineCreation.set(false);
+ }
+
private boolean shouldSchedulePipelineCreator() {
return isPipelineCreatorRunning.compareAndSet(false, true);
}
@@ -105,6 +115,10 @@ class BackgroundPipelineCreator {
private void createPipelines() {
// TODO: #CLUTIL Different replication factor may need to be supported
+
+ if(pausePipelineCreation.get()) {
+ return;
+ }
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 0cb905e..ed0b047 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -125,4 +125,8 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean,
* @return boolean
*/
boolean getSafeModeStatus();
+
+ void freezePipelineCreation();
+
+ void resumePipelineCreation();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index e0ea885..91c49c3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -90,6 +90,10 @@ public class SCMPipelineManager implements PipelineManager {
// to prevent pipelines being created until sufficient nodes have registered.
private final AtomicBoolean pipelineCreationAllowed;
+ // This allows for freezing/resuming the new pipeline creation while the
+ // SCM is already out of SafeMode.
+ private AtomicBoolean freezePipelineCreation;
+
public SCMPipelineManager(ConfigurationSource conf,
NodeManager nodeManager,
Table<PipelineID, Pipeline> pipelineStore,
@@ -134,6 +138,9 @@ public class SCMPipelineManager implements PipelineManager {
// Pipeline creation is only allowed after the safemode prechecks have
// passed, eg sufficient nodes have registered.
this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
+ // controls freezing/resuming pipeline creation regardless of SafeMode
+ // status.
+ this.freezePipelineCreation = new AtomicBoolean(false);
}
public PipelineStateManager getStateManager() {
@@ -266,6 +273,12 @@ public class SCMPipelineManager implements PipelineManager
{
throw new IOException("Pipeline creation is not allowed as safe mode " +
"prechecks have not yet passed");
}
+ if (freezePipelineCreation.get()) {
+ LOG.debug("Pipeline creation is frozen while an upgrade is in " +
+ "progress");
+ throw new IOException("Pipeline creation is frozen while an upgrade " +
+ "is in progress");
+ }
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -749,6 +762,18 @@ public class SCMPipelineManager implements PipelineManager
{
return this.isInSafeMode.get();
}
+ @Override
+ public void freezePipelineCreation() {
+ freezePipelineCreation.set(true);
+ backgroundPipelineCreator.pause();
+ }
+
+ @Override
+ public void resumePipelineCreation() {
+ freezePipelineCreation.set(false);
+ backgroundPipelineCreator.resume();
+ }
+
public Table<PipelineID, Pipeline> getPipelineStore() {
return pipelineStore;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 841426f..c0b4689 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
@@ -56,6 +57,7 @@ import
org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -80,6 +82,8 @@ import
org.apache.hadoop.hdds.scm.node.NonHealthyToReadOnlyHealthyNodeHandler;
import org.apache.hadoop.hdds.scm.node.ReadOnlyHealthyToHealthyNodeHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
@@ -119,6 +123,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.protobuf.BlockingService;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
+import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1157,4 +1163,106 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
public HDDSLayoutVersionManager getLayoutVersionManager() {
return scmLayoutVersionManager;
}
+
+ private void waitForAllContainersToClose() {
+ boolean containersFound = true;
+ while (containersFound) {
+ containersFound = false;
+ for (DatanodeDetails datanodeDetails : scmNodeManager.getAllNodes()) {
+ try {
+ for (ContainerID id : scmNodeManager.getContainers(datanodeDetails))
{
+ try {
+ final ContainerInfo container =
containerManager.getContainer(id);
+ if (container.getState() == HddsProtos.LifeCycleState.OPEN ||
+ container.getState() == HddsProtos.LifeCycleState.CLOSING) {
+ containersFound = true;
+ if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
+ eventQueue.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
+ }
+ }
+ } catch (ContainerNotFoundException cnfe) {
+ LOG.warn("Container {} is not managed by ContainerManager.",
+ id, cnfe);
+ continue;
+ }
+ }
+ } catch (NodeNotFoundException e) {
+ continue;
+ }
+ }
+ try {
+ if (containersFound) {
+ LOG.info("Waiting for all containers to close.");
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+
+ private void waitForAllPipelinesToDestroy() throws IOException {
+ boolean pipelineFound = true;
+ while (pipelineFound) {
+ pipelineFound = false;
+ for (Pipeline pipeline : pipelineManager.getPipelines()) {
+ if (pipeline.getPipelineState() != CLOSED) {
+ pipelineFound = true;
+ pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ }
+ }
+ try {
+ if (pipelineFound) {
+ LOG.info("Waiting for all pipelines to close.");
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+
+ // This should be called in the context of a separate finalize upgrade
thread.
+ // This function can block indefinitely till the conditions are met to safely
+ // finalize Upgrade.
+
+ public void preFinalizeUpgrade() throws IOException {
+ /**
+ * Ask pipeline manager to not create any new pipelines. Pipeline
+ * creation will remain frozen until postFinalizeUpgrade().
+ */
+ pipelineManager.freezePipelineCreation();
+
+ /**
+ * Ask all the existing data nodes to close any open containers and
+ * destroy existing pipelines
+ */
+ waitForAllPipelinesToDestroy();
+
+ /**
+ * We can not yet move all the existing data nodes to HEALTHY-READONLY
+ * state since the next heartbeat will move them back to HEALTHY state.
+ * This has to wait till postFinalizeUpgrade, when SCM MLV version is
+ * already upgraded as part of finalize processing.
+ * While in this state, it should be safe to do finalize processing for
+ * all new features. This will also update ondisk mlv version. Any
+ * disrupting upgrade can add a hook here to make sure that SCM is in a
+ * consistent state while finalizing the upgrade.
+ */
+ }
+
+ public void postFinalizeUpgrade() {
+ /**
+ * Don't wait for next heartbeat from datanodes in order to move them to
+ * Healthy-Readonly state. Force them to Healthy-ReadOnly state so that
+ * we can resume pipeline creation right away.
+ */
+ scmNodeManager.forceNodesToHealthyReadOnly();
+
+ /**
+ * Allow pipeline manager to create any new pipelines if it can
+ * find enough Healthy data nodes.
+ */
+ pipelineManager.resumePipelineCreation();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
deleted file mode 100644
index 88afad1..0000000
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/NewSCMFeatureUpgradeAction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.server.upgrade;
-
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
-
-/**
- * Example SCM Action class to help with understanding.
- */
-public class NewSCMFeatureUpgradeAction implements
- HDDSUpgradeAction<StorageContainerManager> {
-
- @Override
- public void executeAction(StorageContainerManager scm) {
- // Do blah....
- }
-}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
index 809c3dd..8137756 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeAction.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
/**
* Upgrade Action for StorageContainerManager which takes in an 'SCM' instance.
*/
-public interface SCMUpgradeAction<T> extends
+public class SCMUpgradeAction extends
HDDSUpgradeAction<StorageContainerManager> {
+ public void executeAction(StorageContainerManager arg) throws Exception {
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
new file mode 100644
index 0000000..bf91ccd
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/upgrade/SCMUpgradeFinalizer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.upgrade;
+
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds
+ .upgrade.HDDSLayoutFeatureCatalog.HDDSLayoutFeature;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
+
+/**
+ * UpgradeFinalizer for the Storage Container Manager service.
+ */
+public class SCMUpgradeFinalizer extends
+ BasicUpgradeFinalizer<StorageContainerManager, HDDSLayoutVersionManager> {
+
+ public SCMUpgradeFinalizer(HDDSLayoutVersionManager versionManager) {
+ super(versionManager);
+ }
+
+ @Override
+ public StatusAndMessages finalize(String upgradeClientID,
+ StorageContainerManager scm)
+ throws IOException {
+ StatusAndMessages response = preFinalize(upgradeClientID, scm);
+ if (response.status() != FINALIZATION_REQUIRED) {
+ return response;
+ }
+ new Worker(scm).call();
+ return STARTING_MSG;
+ }
+
+ private class Worker implements Callable<Void> {
+ private StorageContainerManager storageContainerManager;
+
+ /**
+ * Initiates the Worker, for the specified SCM instance.
+ * @param scm the StorageContainerManager instance on which to finalize the
+ * new LayoutFeatures.
+ */
+ Worker(StorageContainerManager scm) {
+ storageContainerManager = scm;
+ }
+
+ @Override
+ public Void call() throws IOException {
+ try {
+ emitStartingMsg();
+ versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
+ /*
+ * Before we can call finalize the feature, we need to make sure that
+ * all existing pipelines are closed and pipeline Manger would freeze
+ * all new pipeline creation.
+ */
+ storageContainerManager.preFinalizeUpgrade();
+
+ for (HDDSLayoutFeature f : versionManager.unfinalizedFeatures()) {
+ finalizeFeature(f, storageContainerManager.getScmStorageConfig());
+ updateLayoutVersionInVersionFile(f,
+ storageContainerManager.getScmStorageConfig());
+ versionManager.finalized(f);
+ }
+ versionManager.completeFinalization();
+ storageContainerManager.postFinalizeUpgrade();
+ emitFinishedMsg();
+ return null;
+ } finally {
+ versionManager.setUpgradeState(FINALIZATION_DONE);
+ isDone = true;
+ }
+ }
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
index 4882fcf..0850cf1 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -18,90 +18,48 @@
package org.apache.hadoop.ozone.om.upgrade;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.*;
-import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
-import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
-import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
-import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
-import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
/**
* UpgradeFinalizer implementation for the Ozone Manager service.
*/
-public class OMUpgradeFinalizer implements UpgradeFinalizer<OzoneManager> {
-
+public class OMUpgradeFinalizer extends BasicUpgradeFinalizer<OzoneManager,
+ OMLayoutVersionManagerImpl> {
private static final OmUpgradeAction NOOP = a -> {};
- private OMLayoutVersionManagerImpl versionManager;
- private String clientID;
-
- private Queue<String> msgs = new ConcurrentLinkedQueue<>();
- private boolean isDone = false;
-
public OMUpgradeFinalizer(OMLayoutVersionManagerImpl versionManager) {
- this.versionManager = versionManager;
+ super(versionManager);
}
@Override
public StatusAndMessages finalize(String upgradeClientID, OzoneManager om)
throws IOException {
- if (!versionManager.needsFinalization()) {
- return FINALIZED_MSG;
- }
- clientID = upgradeClientID;
-
-// This requires some more investigation on how to do it properly while
-// requests are on the fly, and post finalize features one by one.
-// Until that is done, monitoring is not really doing anything meaningful
-// but this is a tradoff we can take for the first iteration either if needed,
-// as the finalization of the first few features should not take that long.
-// Follow up JIRA is in HDDS-4286
-// String threadName = "OzoneManager-Upgrade-Finalizer";
-// ExecutorService executor =
-// Executors.newSingleThreadExecutor(r -> new Thread(threadName));
-// executor.submit(new Worker(om));
+ StatusAndMessages response = preFinalize(upgradeClientID, om);
+ if (response.status() != FINALIZATION_REQUIRED) {
+ return response;
+ }
+ // This requires some more investigation on how to do it properly while
+ // requests are on the fly, and post finalize features one by one.
+ // Until that is done, monitoring is not really doing anything meaningful
+ // but this is a tradoff we can take for the first iteration either if
+ // needed, as the finalization of the first few features should not take
+ // that long. Follow up JIRA is in HDDS-4286
+ // String threadName = "OzoneManager-Upgrade-Finalizer";
+ // ExecutorService executor =
+ // Executors.newSingleThreadExecutor(r -> new Thread(threadName));
+ // executor.submit(new Worker(om));
new Worker(om).call();
return STARTING_MSG;
}
- @Override
- public synchronized StatusAndMessages reportStatus(
- String upgradeClientID, boolean takeover
- ) throws IOException {
- if (takeover) {
- clientID = upgradeClientID;
- }
- assertClientId(upgradeClientID);
- List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
- Status status = isDone ? FINALIZATION_DONE : FINALIZATION_IN_PROGRESS;
- while (msgs.size() > 0) {
- returningMsgs.add(msgs.poll());
- }
- return new StatusAndMessages(status, returningMsgs);
- }
-
- private void assertClientId(String id) throws OMException {
- if (!this.clientID.equals(id)) {
- throw new OMException("Unknown client tries to get finalization
status.\n"
- + "The requestor is not the initiating client of the finalization,"
- + " if you want to take over, and get unsent status messages, check"
- + " -takeover option.", INVALID_REQUEST);
- }
- }
-
/**
* This class implements the finalization logic applied to every
* LayoutFeature that needs to be finalized.
@@ -134,26 +92,28 @@ public class OMUpgradeFinalizer implements
UpgradeFinalizer<OzoneManager> {
}
@Override
- public Void call() throws OMException {
+ public Void call() throws IOException {
try {
emitStartingMsg();
+ versionManager.setUpgradeState(FINALIZATION_IN_PROGRESS);
for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
finalizeFeature(f);
- updateLayoutVersionInVersionFile(f);
+ updateLayoutVersionInVersionFile(f, ozoneManager.getOmStorage());
versionManager.finalized(f);
}
versionManager.completeFinalization();
emitFinishedMsg();
+ return null;
} finally {
+ versionManager.setUpgradeState(FINALIZATION_DONE);
isDone = true;
}
- return null;
}
private void finalizeFeature(OMLayoutFeature feature)
- throws OMException {
+ throws IOException {
OmUpgradeAction action = feature.onFinalizeAction().orElse(NOOP);
if (action == NOOP) {
@@ -161,7 +121,7 @@ public class OMUpgradeFinalizer implements
UpgradeFinalizer<OzoneManager> {
return;
}
- putFinalizationMarkIntoVersionFile(feature);
+ putFinalizationMarkIntoVersionFile(feature, ozoneManager.getOmStorage());
emitStartingFinalizationActionMsg(feature.name());
try {
@@ -171,159 +131,8 @@ public class OMUpgradeFinalizer implements
UpgradeFinalizer<OzoneManager> {
}
emitFinishFinalizationActionMsg(feature.name());
- removeFinalizationMarkFromVersionFile(feature);
- }
-
- private void updateLayoutVersionInVersionFile(OMLayoutFeature feature)
- throws OMException {
- int prevLayoutVersion = currentStoredLayoutVersion();
-
- updateStorageLayoutVersion(feature.layoutVersion());
- try {
- persistStorage();
- } catch (IOException e) {
- updateStorageLayoutVersion(prevLayoutVersion);
- logLayoutVersionUpdateFailureAndThrow(e);
- }
- }
-
- private void putFinalizationMarkIntoVersionFile(OMLayoutFeature feature)
- throws OMException {
- try {
- emitUpgradeToLayoutVersionPersistingMsg(feature.name());
-
- setUpgradeToLayoutVersionInStorage(feature.layoutVersion());
- persistStorage();
-
- emitUpgradeToLayoutVersionPersistedMsg();
- } catch (IOException e) {
- logUpgradeToLayoutVersionPersistingFailureAndThrow(feature.name(), e);
- }
- }
-
- private void removeFinalizationMarkFromVersionFile(OMLayoutFeature feature)
- throws OMException {
- try {
- emitRemovingUpgradeToLayoutVersionMsg(feature.name());
-
- unsetUpgradeToLayoutVersionInStorage();
- persistStorage();
-
- emitRemovedUpgradeToLayoutVersionMsg();
- } catch (IOException e) {
- logUpgradeToLayoutVersionRemovalFailureAndThrow(feature.name(), e);
- }
- }
-
-
-
-
-
- private void setUpgradeToLayoutVersionInStorage(int version) {
- ozoneManager.getOmStorage().setUpgradeToLayoutVersion(version);
- }
-
- private void unsetUpgradeToLayoutVersionInStorage() {
- ozoneManager.getOmStorage().unsetUpgradeToLayoutVersion();
- }
-
- private int currentStoredLayoutVersion() {
- return ozoneManager.getOmStorage().getLayoutVersion();
- }
-
- private void updateStorageLayoutVersion(int version) {
- ozoneManager.getOmStorage().setLayoutVersion(version);
- }
-
- private void persistStorage() throws IOException {
- ozoneManager.getOmStorage().persistCurrentState();
- }
-
- private void emitNOOPMsg(String feature) {
- String msg = "No finalization work defined for feature: " + feature +
".";
- String msg2 = "Skipping.";
-
- logAndEmit(msg);
- logAndEmit(msg2);
- }
-
- private void emitStartingMsg() {
- String msg = "Finalization started.";
- logAndEmit(msg);
- }
-
- private void emitFinishedMsg() {
- String msg = "Finalization is done.";
- logAndEmit(msg);
- }
-
- private void emitStartingFinalizationActionMsg(String feature) {
- String msg = "Executing finalization of feature: " + feature + ".";
- logAndEmit(msg);
- }
-
- private void emitFinishFinalizationActionMsg(String feature) {
- String msg = "The feature " + feature + " is finalized.";
- logAndEmit(msg);
- }
-
- private void emitUpgradeToLayoutVersionPersistingMsg(String feature) {
- String msg = "Mark finalization of " + feature + " in VERSION file.";
- logAndEmit(msg);
- }
-
- private void emitUpgradeToLayoutVersionPersistedMsg() {
- String msg = "Finalization mark placed.";
- logAndEmit(msg);
- }
-
- private void emitRemovingUpgradeToLayoutVersionMsg(String feature) {
- String msg = "Remove finalization mark of " + feature
- + " feature from VERSION file.";
- logAndEmit(msg);
- }
-
- private void emitRemovedUpgradeToLayoutVersionMsg() {
- String msg = "Finalization mark removed.";
- logAndEmit(msg);
- }
-
- private void logAndEmit(String msg) {
- LOG.info(msg);
- msgs.offer(msg);
- }
-
- private void logFinalizationFailureAndThrow(Exception e, String feature)
- throws OMException {
- String msg = "Error during finalization of " + feature + ".";
- logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
- }
-
- private void logLayoutVersionUpdateFailureAndThrow(IOException e)
- throws OMException {
- String msg = "Updating the LayoutVersion in the VERSION file failed.";
- logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
- }
-
- private void logUpgradeToLayoutVersionPersistingFailureAndThrow(
- String feature, IOException e
- ) throws OMException {
- String msg = "Failed to update VERSION file with the upgrading feature: "
- + feature + ".";
- logAndThrow(e, msg, PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED);
- }
-
- private void logUpgradeToLayoutVersionRemovalFailureAndThrow(
- String feature, IOException e) throws OMException {
- String msg =
- "Failed to unmark finalization of " + feature + " LayoutFeature.";
- logAndThrow(e, msg, REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED);
- }
-
- private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
- throws OMException {
- LOG.error(msg, e);
- throw new OMException(msg, e, resultCode);
+ removeFinalizationMarkFromVersionFile(feature,
+ ozoneManager.getOmStorage());
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
index fbb7ce9..f185f8c 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.om.upgrade;
import org.apache.hadoop.ozone.om.OMStorage;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.upgrade.UpgradeException;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
import org.junit.Rule;
@@ -38,9 +38,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import static
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -73,8 +74,8 @@ public class TestOMUpgradeFinalizer {
@Test
public void testEmitsFinalizedStatusIfAlreadyFinalized() throws Exception {
- when(versionManager.needsFinalization()).thenReturn(false);
+ when(versionManager.getUpgradeState()).thenReturn(ALREADY_FINALIZED);
OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
StatusAndMessages ret = finalizer.finalize(CLIENT_ID, null);
@@ -109,6 +110,10 @@ public class TestOMUpgradeFinalizer {
OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+
+ if (finalizer.isFinalizationDone()) {
+ when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+ }
StatusAndMessages ret = finalizer.reportStatus(CLIENT_ID, false);
assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
@@ -123,6 +128,9 @@ public class TestOMUpgradeFinalizer {
OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+ if (finalizer.isFinalizationDone()) {
+ when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+ }
StatusAndMessages ret = finalizer.reportStatus(OTHER_CLIENT_ID, true);
assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
@@ -137,7 +145,7 @@ public class TestOMUpgradeFinalizer {
OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
- exception.expect(OMException.class);
+ exception.expect(UpgradeException.class);
exception.expectMessage("Unknown client");
finalizer.reportStatus(OTHER_CLIENT_ID, false);
@@ -176,6 +184,9 @@ public class TestOMUpgradeFinalizer {
verify(om.getOmStorage(), once())
.setLayoutVersion(f.layoutVersion());
+ if (finalizer.isFinalizationDone()) {
+ when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+ }
StatusAndMessages status = finalizer.reportStatus(CLIENT_ID, false);
assertEquals(FINALIZATION_DONE, status.status());
assertFalse(status.msgs().isEmpty());
@@ -199,13 +210,16 @@ public class TestOMUpgradeFinalizer {
finalizer.finalize(CLIENT_ID, om);
fail();
} catch (Exception e) {
- assertThat(e, instanceOf(OMException.class));
+ assertThat(e, instanceOf(UpgradeException.class));
assertThat(e.getMessage(), containsString(lfs.iterator().next().name()));
assertEquals(
- ((OMException) e).getResult(),
+ ((UpgradeException) e).getResult(),
LAYOUT_FEATURE_FINALIZATION_FAILED
);
}
+ if (finalizer.isFinalizationDone()) {
+ when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_DONE);
+ }
// Verify that we have never removed the upgradeToLV from the storage
// as finalization of the first feature in the list fails.
@@ -240,6 +254,7 @@ public class TestOMUpgradeFinalizer {
private void setupVersionManagerMockToFinalize(
Iterable<OMLayoutFeature> lfs
) {
+ when(versionManager.getUpgradeState()).thenReturn(FINALIZATION_REQUIRED);
when(versionManager.needsFinalization()).thenReturn(true);
when(versionManager.unfinalizedFeatures()).thenReturn(lfs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]