This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 593337bad9 HDDS-11282. Combine test setup for datanode upgrade unit 
tests (#7063)
593337bad9 is described below

commit 593337bad94bb28b1495539fd0d9cd31d8a3361c
Author: Chung En Lee <[email protected]>
AuthorDate: Thu Aug 15 17:26:04 2024 +0800

    HDDS-11282. Combine test setup for datanode upgrade unit tests (#7063)
---
 .../hadoop/ozone/container/common/ScmTestMock.java |   8 +-
 .../upgrade/TestDatanodeUpgradeToHBaseSupport.java | 242 ++-----------
 .../upgrade/TestDatanodeUpgradeToSchemaV3.java     | 382 +++++----------------
 .../upgrade/TestDatanodeUpgradeToScmHA.java        | 331 +++++-------------
 .../ozone/container/upgrade/UpgradeTestHelper.java | 271 +++++++++++++++
 5 files changed, 495 insertions(+), 739 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 5cb698482a..c333ba1b69 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -74,9 +74,9 @@ public class ScmTestMock implements 
StorageContainerDatanodeProtocol {
     scmId = UUID.randomUUID().toString();
   }
 
-  public ScmTestMock(String clusterId, String scmId) {
+  public ScmTestMock(String clusterId) {
     this.clusterId = clusterId;
-    this.scmId = scmId;
+    this.scmId = UUID.randomUUID().toString();
   }
 
   // Map of datanode to containers
@@ -368,6 +368,10 @@ public class ScmTestMock implements 
StorageContainerDatanodeProtocol {
     this.clusterId = clusterId;
   }
 
+  public String getScmId() {
+    return scmId;
+  }
+
   public void setScmId(String scmId) {
     this.scmId = scmId;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
index 37bea49b34..55396446d5 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
@@ -20,44 +20,28 @@ package org.apache.hadoop.ozone.container.upgrade;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.ScmTestMock;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests upgrading a single datanode from HADOOP_PRC_PORTS_IN_DATANODEDETAILS 
to HBASE_SUPPORT.
@@ -67,22 +51,19 @@ public class TestDatanodeUpgradeToHBaseSupport {
   private Path tempFolder;
 
   private DatanodeStateMachine dsm;
+  private ContainerDispatcher dispatcher;
   private OzoneConfiguration conf;
   private static final String CLUSTER_ID = "clusterID";
 
   private RPC.Server scmRpcServer;
   private InetSocketAddress address;
 
-  private Random random;
-
   private void initTests() throws Exception {
     conf = new OzoneConfiguration();
     setup();
   }
 
   private void setup() throws Exception {
-    random = new Random();
-
     address = SCMTestUtils.getReuseableAddress();
     conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
@@ -107,26 +88,30 @@ public class TestDatanodeUpgradeToHBaseSupport {
   public void testIncrementalChunkListBeforeAndAfterUpgrade() throws Exception 
{
     initTests();
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     // Add data to read.
-    final long containerID = addContainer(pipeline);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     // incremental chunk list should be rejected before finalizing.
-    putBlock(containerID, pipeline, true, 
ContainerProtos.Result.UNSUPPORTED_REQUEST);
+    UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline, true, 
ContainerProtos.Result.UNSUPPORTED_REQUEST);
     Container<?> container = 
dsm.getContainer().getContainerSet().getContainer(containerID);
     assertEquals(OPEN, container.getContainerData().getState());
     // close container to allow upgrade.
-    closeContainer(containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
     dsm.finalizeUpgrade();
     
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
     // open a new container after finalization
-    final long containerID2 = addContainer(pipeline);
+    final long containerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     // incremental chunk list should work after finalizing.
-    putBlock(containerID2, pipeline, true);
+    UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline, true);
     Container<?> container2 = 
dsm.getContainer().getContainerSet().getContainer(containerID2);
     assertEquals(OPEN, container2.getContainerData().getState());
   }
@@ -138,195 +123,36 @@ public class TestDatanodeUpgradeToHBaseSupport {
   public void testBlockFinalizationBeforeAndAfterUpgrade() throws Exception {
     initTests();
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     // Add data to read.
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID, 
pipeline, false);
-    finalizeBlock(containerID, writeChunk.getBlockID().getLocalID(), 
ContainerProtos.Result.UNSUPPORTED_REQUEST);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline, false);
+    UpgradeTestHelper.finalizeBlock(
+        dispatcher, containerID, writeChunk.getBlockID().getLocalID(), 
ContainerProtos.Result.UNSUPPORTED_REQUEST);
     Container<?> container = 
dsm.getContainer().getContainerSet().getContainer(containerID);
     assertEquals(OPEN, container.getContainerData().getState());
     // close container to allow upgrade.
-    closeContainer(containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
     dsm.finalizeUpgrade();
     
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
-    final long containerID2 = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk2 = 
putBlock(containerID2, pipeline, false);
+    final long containerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk2 =
+        UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline, false);
     // Make sure we can read after finalizing too.
-    finalizeBlock(containerID2, writeChunk2.getBlockID().getLocalID(), 
ContainerProtos.Result.SUCCESS);
+    UpgradeTestHelper.finalizeBlock(
+        dispatcher, containerID2, writeChunk2.getBlockID().getLocalID(), 
ContainerProtos.Result.SUCCESS);
     Container<?> container2 = 
dsm.getContainer().getContainerSet().getContainer(containerID2);
     assertEquals(OPEN, container2.getContainerData().getState());
   }
 
-  /**
-   * Starts the datanode with the fore layout version, and calls the version
-   * endpoint task to get cluster ID and SCM ID.
-   *
-   * The daemon for the datanode state machine is not started in this test.
-   * This greatly speeds up execution time.
-   * It means we do not have heartbeat functionality or pre-finalize
-   * upgrade actions, but neither of those things are needed for these tests.
-   */
-  public void startPreFinalizedDatanode() throws Exception {
-    // Set layout version.
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
-        UUID.randomUUID().toString(),
-        HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
-    layoutStorage.initialize();
-
-    // Build and start the datanode.
-    DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
-    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
-    int actualMlv = 
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
-    assertEquals(
-        HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion(),
-        actualMlv);
-    if (dsm != null) {
-      dsm.close();
-    }
-    dsm = newDsm;
-
-    callVersionEndpointTask();
-  }
-
-  /**
-   * Get the cluster ID and SCM ID from SCM to the datanode.
-   */
-  public void callVersionEndpointTask() throws Exception {
-    try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
-        address, 1000)) {
-      VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
-          dsm.getContainer());
-      esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
-      vet.call();
-    }
-  }
-
-  public String startScmServer() throws IOException {
-    String scmID = UUID.randomUUID().toString();
-    ScmTestMock scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
-    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
-        scmServerImpl, address, 10);
-    return scmID;
-  }
-
-  /// CONTAINER OPERATIONS ///
-  public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
-      Pipeline pipeline)  throws Exception {
-    ContainerProtos.ContainerCommandRequestProto readChunkRequest =
-        ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
-    dispatchRequest(readChunkRequest);
-  }
-
-  public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
-      Pipeline pipeline, boolean incremental) throws Exception {
-    return putBlock(containerID, pipeline, incremental, 
ContainerProtos.Result.SUCCESS);
-  }
-
-  public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
-      Pipeline pipeline, boolean incremental, ContainerProtos.Result 
expectedResult) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-        getWriteChunk(containerID, pipeline);
-    dispatchRequest(writeChunkRequest);
-
-    ContainerProtos.ContainerCommandRequestProto putBlockRequest =
-        ContainerTestHelper.getPutBlockRequest(pipeline,
-            writeChunkRequest.getWriteChunk(), incremental);
-    dispatchRequest(putBlockRequest, expectedResult);
-
-    return writeChunkRequest.getWriteChunk();
-  }
-
-  public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
-      long containerID, Pipeline pipeline) throws Exception {
-    return ContainerTestHelper.getWriteChunkRequest(pipeline,
-            ContainerTestHelper.getTestBlockID(containerID), 100);
-  }
-
-  public Pipeline getPipeline() {
-    return MockPipeline.createPipeline(
-        Collections.singletonList(dsm.getDatanodeDetails()));
-  }
-
-  public long addContainer(Pipeline pipeline)
-      throws Exception {
-    long containerID = random.nextInt(Integer.MAX_VALUE);
-    ContainerProtos.ContainerCommandRequestProto createContainerRequest =
-        ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
-    dispatchRequest(createContainerRequest);
-
-    return containerID;
-  }
-
-  public void deleteContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
-        ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
-    dispatchRequest(deleteContainerRequest);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline,
-      ContainerProtos.Result expectedResult) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
-        ContainerTestHelper.getCloseContainer(pipeline, containerID);
-    dispatchRequest(closeContainerRequest, expectedResult);
-  }
-
-  public void finalizeBlock(long containerID, long localID, 
ContainerProtos.Result expectedResult) {
-    ContainerInfo container = mock(ContainerInfo.class);
-    when(container.getContainerID()).thenReturn(containerID);
-
-    ContainerProtos.ContainerCommandRequestProto finalizeBlockRequest =
-        ContainerTestHelper.getFinalizeBlockRequest(localID, container, 
UUID.randomUUID().toString());
-
-    dispatchRequest(finalizeBlockRequest, expectedResult);
-  }
-
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request) {
-    dispatchRequest(request, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request,
-      ContainerProtos.Result expectedResult) {
-    ContainerProtos.ContainerCommandResponseProto response =
-        dsm.getContainer().getDispatcher().dispatch(request, null);
-    assertEquals(expectedResult, response.getResult());
-  }
-
-  /// VOLUME OPERATIONS ///
-
-  /**
-   * Append a datanode volume to the existing volumes in the configuration.
-   * @return The root directory for the new volume.
-   */
-  public File addHddsVolume() throws IOException {
-
-    File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
-        .toString())).toFile();
-    String[] existingVolumes =
-        conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
-    List<String> allVolumes = new ArrayList<>();
-    if (existingVolumes != null) {
-      allVolumes.addAll(Arrays.asList(existingVolumes));
-    }
-
-    allVolumes.add(vol.getAbsolutePath());
-    conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
-        allVolumes.toArray(new String[0]));
-
-    return vol;
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
index 23b7da2634..fc599f7f91 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.upgrade;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -29,21 +28,17 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.ScmTestMock;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.DbVolume;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -52,15 +47,10 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -91,8 +81,6 @@ public class TestDatanodeUpgradeToSchemaV3 {
   private RPC.Server scmRpcServer;
   private InetSocketAddress address;
 
-  private Random random;
-
   private void initTests(Boolean enable) throws Exception {
     boolean schemaV3Enabled = enable;
     conf = new OzoneConfiguration();
@@ -106,8 +94,6 @@ public class TestDatanodeUpgradeToSchemaV3 {
   }
 
   private void setup() throws Exception {
-    random = new Random();
-
     address = SCMTestUtils.getReuseableAddress();
     conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
@@ -135,10 +121,12 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testDBOnHddsVolume(boolean schemaV3Enabled) throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
         .getVolumesList().get(0);
     assertNull(dataVolume.getDbVolume());
@@ -170,11 +158,13 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testDBOnDbVolume(boolean schemaV3Enabled) throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
-    addDbVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    UpgradeTestHelper.addDbVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
         .getVolumesList().get(0);
     assertNull(dataVolume.getDbParentDir());
@@ -209,9 +199,10 @@ public class TestDatanodeUpgradeToSchemaV3 {
       throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
     // add one HddsVolume
-    addHddsVolume();
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
     // Set layout version.
     DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
@@ -232,7 +223,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
     assertNull(dataVolume.getDbParentDir());
 
     // Restart DN and finalize upgrade
-    restartDatanode(
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
     dsm.finalizeUpgrade();
 
@@ -255,13 +246,15 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testFinalizeTwice(boolean schemaV3Enabled) throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
     // add one HddsVolume and two DbVolume
-    addHddsVolume();
-    addDbVolume();
-    addDbVolume();
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    UpgradeTestHelper.addDbVolume(conf, tempFolder);
+    UpgradeTestHelper.addDbVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     dsm.finalizeUpgrade();
 
     DbVolume dbVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
@@ -283,15 +276,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
       throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     dsm.finalizeUpgrade();
 
     // Add a new HddsVolume. It should have DB created after DN restart.
-    addHddsVolume();
-    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
+        HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
         false);
     for (StorageVolume vol:
         dsm.getContainer().getVolumeSet().getVolumesList()) {
@@ -314,10 +310,12 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testAddDbVolumeAfterFinalize(boolean schemaV3Enabled)
       throws Exception {
     initTests(schemaV3Enabled);
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     HddsVolume hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
         .getVolumesList().get(0);
     assertNull(hddsVolume.getDbParentDir());
@@ -328,8 +326,9 @@ public class TestDatanodeUpgradeToSchemaV3 {
         hddsVolume.getStorageDir().getAbsolutePath()));
 
     // Add a new DbVolume
-    addDbVolume();
-    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+    UpgradeTestHelper.addDbVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
+        HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
         false);
 
     // HddsVolume should still use the rocksDB under it's volume
@@ -354,15 +353,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
       throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     dsm.finalizeUpgrade();
 
-    addDbVolume();
-    File newDataVolume = addHddsVolume();
-    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+    UpgradeTestHelper.addDbVolume(conf, tempFolder);
+    File newDataVolume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
+        HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
         false);
 
     DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
@@ -419,18 +421,22 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testWrite(boolean enable, String expectedVersion)
       throws Exception {
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
     // Disable Schema V3
     conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED, false);
-    startPreFinalizedDatanode();
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+    ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
     dsm.finalizeUpgrade();
 
-    final Pipeline pipeline = getPipeline();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
     // Create a container to write data.
-    final long containerID1 = addContainer(pipeline);
-    putBlock(containerID1, pipeline);
-    closeContainer(containerID1, pipeline);
+    final long containerID1 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    UpgradeTestHelper.putBlock(dispatcher, containerID1, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID1, pipeline);
     KeyValueContainer container = (KeyValueContainer)
         dsm.getContainer().getContainerSet().getContainer(containerID1);
     // When SchemaV3 is disabled, new data should be saved as SchemaV2.
@@ -440,13 +446,15 @@ public class TestDatanodeUpgradeToSchemaV3 {
     // Set SchemaV3 enable status
     conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
         enable);
-    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
+        HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
         false);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     // Write new data
-    final long containerID2 = addContainer(pipeline);
-    putBlock(containerID2, pipeline);
-    closeContainer(containerID2, pipeline);
+    final long containerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID2, pipeline);
     container = (KeyValueContainer)
         dsm.getContainer().getContainerSet().getContainer(containerID2);
     // If SchemaV3 is enabled, new data should be saved as SchemaV3
@@ -464,16 +472,20 @@ public class TestDatanodeUpgradeToSchemaV3 {
       throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+    ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     // Add data to read.
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
-        pipeline);
-    closeContainer(containerID, pipeline);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
     // Create thread to keep reading during finalization.
     ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -481,10 +493,10 @@ public class TestDatanodeUpgradeToSchemaV3 {
       // Layout version check should be thread safe.
       while (!dsm.getLayoutVersionManager()
           .isAllowed(HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
-        readChunk(writeChunk, pipeline);
+        UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
       }
       // Make sure we can read after finalizing too.
-      readChunk(writeChunk, pipeline);
+      UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
       return null;
     });
 
@@ -502,8 +514,9 @@ public class TestDatanodeUpgradeToSchemaV3 {
   public void testFinalizeFailure(boolean schemaV3Enabled) throws Exception {
     initTests(schemaV3Enabled);
     // start DN and SCM
-    startScmServer();
-    addHddsVolume();
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        new ScmTestMock(CLUSTER_ID), address, 10);
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
     // Let HddsVolume be formatted to mimic the real cluster upgrade
     // Set layout version.
     DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
@@ -523,15 +536,17 @@ public class TestDatanodeUpgradeToSchemaV3 {
     assertNull(dataVolume.getDbParentDir());
 
     // Restart DN
-    restartDatanode(
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+    ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
 
     // Write some data.
-    final Pipeline pipeline = getPipeline();
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
-        pipeline);
-    closeContainer(containerID, pipeline);
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
     KeyValueContainer container = (KeyValueContainer)
         dsm.getContainer().getContainerSet().getContainer(containerID);
     assertEquals(OzoneConsts.SCHEMA_V2,
@@ -558,227 +573,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
         dsm.getContainer().getContainerSet().getContainer(containerID);
     assertEquals(OzoneConsts.SCHEMA_V2,
         container.getContainerData().getSchemaVersion());
-    readChunk(writeChunk, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
 
     // SchemaV3 is not finalized, so still ERASURE_CODED_STORAGE_SUPPORT
-    restartDatanode(
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder, 
address,
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     // Old data is readable after DN restart
     container = (KeyValueContainer)
         dsm.getContainer().getContainerSet().getContainer(containerID);
     assertEquals(OzoneConsts.SCHEMA_V2,
         container.getContainerData().getSchemaVersion());
-    readChunk(writeChunk, pipeline);
-  }
-
-  public void checkContainerPathID(long containerID, String expectedID) {
-    KeyValueContainerData data =
-        (KeyValueContainerData) dsm.getContainer().getContainerSet()
-            .getContainer(containerID).getContainerData();
-    assertThat(data.getChunksPath()).contains(expectedID);
-    assertThat(data.getMetadataPath()).contains(expectedID);
-  }
-
-  public List<File> getHddsSubdirs(File volume) {
-    File[] subdirsArray = getHddsRoot(volume).listFiles(File::isDirectory);
-    assertNotNull(subdirsArray);
-    return Arrays.asList(subdirsArray);
-  }
-
-  public File getHddsRoot(File volume) {
-    return new File(HddsVolumeUtil.getHddsRoot(volume.getAbsolutePath()));
-  }
-
-  /**
-   * Starts the datanode with the fore layout version, and calls the version
-   * endpoint task to get cluster ID and SCM ID.
-   *
-   * The daemon for the datanode state machine is not started in this test.
-   * This greatly speeds up execution time.
-   * It means we do not have heartbeat functionality or pre-finalize
-   * upgrade actions, but neither of those things are needed for these tests.
-   */
-  public void startPreFinalizedDatanode() throws Exception {
-    // Set layout version.
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
-        UUID.randomUUID().toString(),
-        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
-    layoutStorage.initialize();
-
-    // Build and start the datanode.
-    DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
-    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
-    int actualMlv = 
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
-    assertEquals(
-        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(),
-        actualMlv);
-    if (dsm != null) {
-      dsm.close();
-    }
-    dsm = newDsm;
-
-    callVersionEndpointTask();
-  }
-
-  public void restartDatanode(int expectedMlv, boolean exactMatch)
-      throws Exception {
-    // Stop existing datanode.
-    DatanodeDetails dd = dsm.getDatanodeDetails();
-    dsm.close();
-
-    // Start new datanode with the same configuration.
-    dsm = new DatanodeStateMachine(dd, conf);
-    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
-    if (exactMatch) {
-      assertEquals(expectedMlv, mlv);
-    } else {
-      assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
-    }
-
-    callVersionEndpointTask();
-  }
-
-  /**
-   * Get the cluster ID and SCM ID from SCM to the datanode.
-   */
-  public void callVersionEndpointTask() throws Exception {
-    try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
-        address, 1000)) {
-      VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
-          dsm.getContainer());
-      esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
-      vet.call();
-    }
-  }
-
-  public String startScmServer() throws IOException {
-    String scmID = UUID.randomUUID().toString();
-    ScmTestMock scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
-    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
-        scmServerImpl, address, 10);
-    return scmID;
-  }
-
-  /// CONTAINER OPERATIONS ///
-  public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
-      Pipeline pipeline)  throws Exception {
-    ContainerProtos.ContainerCommandRequestProto readChunkRequest =
-        ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
-    dispatchRequest(readChunkRequest);
-  }
-
-  public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
-      Pipeline pipeline) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-        getWriteChunk(containerID, pipeline);
-    dispatchRequest(writeChunkRequest);
-
-    ContainerProtos.ContainerCommandRequestProto putBlockRequest =
-        ContainerTestHelper.getPutBlockRequest(pipeline,
-            writeChunkRequest.getWriteChunk());
-    dispatchRequest(putBlockRequest);
-
-    return writeChunkRequest.getWriteChunk();
-  }
-
-  public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
-      long containerID, Pipeline pipeline) throws Exception {
-    return ContainerTestHelper.getWriteChunkRequest(pipeline,
-            ContainerTestHelper.getTestBlockID(containerID), 100);
-  }
-
-  public Pipeline getPipeline() {
-    return MockPipeline.createPipeline(
-        Collections.singletonList(dsm.getDatanodeDetails()));
-  }
-
-  public long addContainer(Pipeline pipeline)
-      throws Exception {
-    long containerID = random.nextInt(Integer.MAX_VALUE);
-    ContainerProtos.ContainerCommandRequestProto createContainerRequest =
-        ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
-    dispatchRequest(createContainerRequest);
-
-    return containerID;
-  }
-
-  public void deleteContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
-        ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
-    dispatchRequest(deleteContainerRequest);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline,
-      ContainerProtos.Result expectedResult) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
-        ContainerTestHelper.getCloseContainer(pipeline, containerID);
-    dispatchRequest(closeContainerRequest, expectedResult);
-  }
-
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request) {
-    dispatchRequest(request, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request,
-      ContainerProtos.Result expectedResult) {
-    ContainerProtos.ContainerCommandResponseProto response =
-        dsm.getContainer().getDispatcher().dispatch(request, null);
-    assertEquals(expectedResult, response.getResult());
-  }
-
-  /// VOLUME OPERATIONS ///
-
-  /**
-   * Append a datanode volume to the existing volumes in the configuration.
-   * @return The root directory for the new volume.
-   */
-  public File addHddsVolume() throws IOException {
-
-    File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
-        .toString())).toFile();
-    String[] existingVolumes =
-        conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
-    List<String> allVolumes = new ArrayList<>();
-    if (existingVolumes != null) {
-      allVolumes.addAll(Arrays.asList(existingVolumes));
-    }
-
-    allVolumes.add(vol.getAbsolutePath());
-    conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
-        allVolumes.toArray(new String[0]));
-
-    return vol;
-  }
-
-  /**
-   * Append a db volume to the existing volumes in the configuration.
-   * @return The root directory for the new volume.
-   */
-  public File addDbVolume() throws Exception {
-    File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
-        .toString())).toFile();
-    String[] existingVolumes =
-        conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
-    List<String> allVolumes = new ArrayList<>();
-    if (existingVolumes != null) {
-      allVolumes.addAll(Arrays.asList(existingVolumes));
-    }
-
-    allVolumes.add(vol.getAbsolutePath());
-    conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
-        allVolumes.toArray(new String[0]));
-
-    return vol;
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 59b88bcbea..d4a27e74cd 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.ozone.container.upgrade;
 
-import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -28,17 +26,12 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.container.common.ScmTestMock;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.replication.ContainerImporter;
 import 
org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
@@ -55,11 +48,9 @@ import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -81,6 +72,7 @@ public class TestDatanodeUpgradeToScmHA {
   private Path tempFolder;
 
   private DatanodeStateMachine dsm;
+  private ContainerDispatcher dispatcher;
   private OzoneConfiguration conf;
   private static final String CLUSTER_ID = "clusterID";
   private boolean scmHAAlreadyEnabled;
@@ -89,8 +81,6 @@ public class TestDatanodeUpgradeToScmHA {
   private InetSocketAddress address;
   private ScmTestMock scmServerImpl;
 
-  private Random random;
-
   private void setScmHAEnabled(boolean enableSCMHA)
       throws Exception {
     this.scmHAAlreadyEnabled = enableSCMHA;
@@ -100,8 +90,6 @@ public class TestDatanodeUpgradeToScmHA {
   }
 
   private void setup() throws Exception {
-    random = new Random();
-
     address = SCMTestUtils.getReuseableAddress();
     conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
   }
@@ -124,15 +112,18 @@ public class TestDatanodeUpgradeToScmHA {
     setScmHAEnabled(enableSCMHA);
     // start DN and SCM
     startScmServer();
-    addVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     // Add data to read.
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
-        pipeline);
-    closeContainer(containerID, pipeline);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
 
     // Create thread to keep reading during finalization.
     ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -140,10 +131,10 @@ public class TestDatanodeUpgradeToScmHA {
       // Layout version check should be thread safe.
       while (!dsm.getLayoutVersionManager()
           .isAllowed(HDDSLayoutFeature.SCM_HA)) {
-        readChunk(writeChunk, pipeline);
+        UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
       }
       // Make sure we can read after finalizing too.
-      readChunk(writeChunk, pipeline);
+      UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
       return null;
     });
 
@@ -159,40 +150,45 @@ public class TestDatanodeUpgradeToScmHA {
     setScmHAEnabled(enableSCMHA);
     // start DN and SCM
     startScmServer();
-    addVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     // Pre-export a container to continuously import and delete.
-    final long exportContainerID = addContainer(pipeline);
+    final long exportContainerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     ContainerProtos.WriteChunkRequestProto exportWriteChunk =
-        putBlock(exportContainerID, pipeline);
-    closeContainer(exportContainerID, pipeline);
+        UpgradeTestHelper.putBlock(dispatcher, exportContainerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, exportContainerID, pipeline);
     File exportedContainerFile = exportContainer(exportContainerID);
-    deleteContainer(exportContainerID, pipeline);
+    UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID, pipeline);
 
     // Export another container to import while pre-finalized and read
     // finalized.
-    final long exportContainerID2 = addContainer(pipeline);
+    final long exportContainerID2 = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
     ContainerProtos.WriteChunkRequestProto exportWriteChunk2 =
-        putBlock(exportContainerID2, pipeline);
-    closeContainer(exportContainerID2, pipeline);
+        UpgradeTestHelper.putBlock(dispatcher, exportContainerID2, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, exportContainerID2, pipeline);
     File exportedContainerFile2 = exportContainer(exportContainerID2);
-    deleteContainer(exportContainerID2, pipeline);
+    UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID2, 
pipeline);
 
     // Make sure we can import and read a container pre-finalized.
     importContainer(exportContainerID2, exportedContainerFile2);
-    readChunk(exportWriteChunk2, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
 
     // Now SCM and enough other DNs finalize to enable SCM HA. This DN is
     // restarted with SCM HA config and gets a different SCM ID.
     conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
     changeScmID();
 
-    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     // Make sure the existing container can be read.
-    readChunk(exportWriteChunk2, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
 
     // Create thread to keep importing containers during the upgrade.
     // Since the datanode's MLV is behind SCM's, container creation is not
@@ -204,12 +200,12 @@ public class TestDatanodeUpgradeToScmHA {
       while (!dsm.getLayoutVersionManager()
           .isAllowed(HDDSLayoutFeature.SCM_HA)) {
         importContainer(exportContainerID, exportedContainerFile);
-        readChunk(exportWriteChunk, pipeline);
-        deleteContainer(exportContainerID, pipeline);
+        UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk, pipeline);
+        UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID, 
pipeline);
       }
       // Make sure we can import after finalizing too.
       importContainer(exportContainerID, exportedContainerFile);
-      readChunk(exportWriteChunk, pipeline);
+      UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk, pipeline);
       return null;
     });
 
@@ -220,7 +216,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     // Make sure we can read the container that was imported while
     // pre-finalized after finalizing.
-    readChunk(exportWriteChunk2, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
   }
 
   @ParameterizedTest(name = "{index}: scmHAAlreadyEnabled={0}")
@@ -230,10 +226,14 @@ public class TestDatanodeUpgradeToScmHA {
     setScmHAEnabled(enableSCMHA);
     /// SETUP ///
 
-    String originalScmID = startScmServer();
-    File volume = addVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    startScmServer();
+    String originalScmID = scmServerImpl.getScmId();
+    File volume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     /// PRE-FINALIZED: Write and Read from formatted volume ///
 
@@ -243,10 +243,10 @@ public class TestDatanodeUpgradeToScmHA {
         dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
 
     // Add container with data, make sure it can be read and written.
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
-        pipeline);
-    readChunk(writeChunk, pipeline);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
 
     checkPreFinalizedVolumePathID(volume, originalScmID, CLUSTER_ID);
     checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
@@ -256,7 +256,7 @@ public class TestDatanodeUpgradeToScmHA {
     failVolume(volume);
     // Since volume is failed, container should be marked unhealthy.
     // Finalization should proceed anyways.
-    closeContainer(containerID, pipeline,
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline,
         ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR);
     State containerState = dsm.getContainer().getContainerSet()
         .getContainer(containerID).getContainerState();
@@ -286,11 +286,13 @@ public class TestDatanodeUpgradeToScmHA {
     // imported to it.
     // This should log a warning about reading from an unhealthy container
     // but otherwise proceed successfully.
-    readChunk(writeChunk, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
 
     /// FINALIZED: Restart datanode to upgrade the failed volume ///
 
-    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder, 
address,
+        HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     assertEquals(1,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -301,12 +303,12 @@ public class TestDatanodeUpgradeToScmHA {
     checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
 
     // Read container from before upgrade. The upgrade required it to be 
closed.
-    readChunk(writeChunk, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
     // Write and read container after upgrade.
-    long newContainerID = addContainer(pipeline);
+    long newContainerID = UpgradeTestHelper.addContainer(dispatcher, pipeline);
     ContainerProtos.WriteChunkRequestProto newWriteChunk =
-        putBlock(newContainerID, pipeline);
-    readChunk(newWriteChunk, pipeline);
+        UpgradeTestHelper.putBlock(dispatcher, newContainerID, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, newWriteChunk, pipeline);
     // The new container should use cluster ID in its path.
     // The volume it is placed on is up to the implementation.
     checkContainerPathID(newContainerID, CLUSTER_ID);
@@ -318,10 +320,14 @@ public class TestDatanodeUpgradeToScmHA {
     setScmHAEnabled(enableSCMHA);
     /// SETUP ///
 
-    String originalScmID = startScmServer();
-    File preFinVolume1 = addVolume();
-    startPreFinalizedDatanode();
-    final Pipeline pipeline = getPipeline();
+    startScmServer();
+    String originalScmID = scmServerImpl.getScmId();
+    File preFinVolume1 = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+    dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+    dispatcher = dsm.getContainer().getDispatcher();
+    final Pipeline pipeline = MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
 
     /// PRE-FINALIZED: Write and Read from formatted volume ///
 
@@ -331,10 +337,10 @@ public class TestDatanodeUpgradeToScmHA {
         dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
 
     // Add container with data, make sure it can be read and written.
-    final long containerID = addContainer(pipeline);
-    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
-        pipeline);
-    readChunk(writeChunk, pipeline);
+    final long containerID = UpgradeTestHelper.addContainer(dispatcher, 
pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk =
+        UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
 
     checkPreFinalizedVolumePathID(preFinVolume1, originalScmID, CLUSTER_ID);
     checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
@@ -346,9 +352,11 @@ public class TestDatanodeUpgradeToScmHA {
     conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
     changeScmID();
     // A new volume is added that must be formatted.
-    File preFinVolume2 = addVolume();
+    File preFinVolume2 = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
 
-    restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder, 
address,
+        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     assertEquals(2,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -356,7 +364,7 @@ public class TestDatanodeUpgradeToScmHA {
         dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
 
     // Because DN mlv would be behind SCM mlv, only reads are allowed.
-    readChunk(writeChunk, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
 
     // On restart, there should have been no changes to the paths already used.
     checkPreFinalizedVolumePathID(preFinVolume1, originalScmID, CLUSTER_ID);
@@ -369,7 +377,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     /// FINALIZE ///
 
-    closeContainer(containerID, pipeline);
+    UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
     dsm.finalizeUpgrade();
     LambdaTestUtils.await(2000, 500,
         () -> dsm.getLayoutVersionManager()
@@ -379,11 +387,13 @@ public class TestDatanodeUpgradeToScmHA {
 
     // Add a new volume that should be formatted with cluster ID only, since
     // DN has finalized.
-    File finVolume = addVolume();
+    File finVolume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
     // Yet another SCM ID is received this time, but it should not matter.
     changeScmID();
 
-    restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+    dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder, 
address,
+        HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+    dispatcher = dsm.getContainer().getDispatcher();
 
     assertEquals(3,
         dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -400,12 +410,12 @@ public class TestDatanodeUpgradeToScmHA {
     /// FINALIZED: Read old data and write + read new data ///
 
     // Read container from before upgrade. The upgrade required it to be 
closed.
-    readChunk(writeChunk, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
     // Write and read container after upgrade.
-    long newContainerID = addContainer(pipeline);
+    long newContainerID = UpgradeTestHelper.addContainer(dispatcher, pipeline);
     ContainerProtos.WriteChunkRequestProto newWriteChunk =
-        putBlock(newContainerID, pipeline);
-    readChunk(newWriteChunk, pipeline);
+        UpgradeTestHelper.putBlock(dispatcher, newContainerID, pipeline);
+    UpgradeTestHelper.readChunk(dispatcher, newWriteChunk, pipeline);
     // The new container should use cluster ID in its path.
     // The volume it is placed on is up to the implementation.
     checkContainerPathID(newContainerID, CLUSTER_ID);
@@ -496,82 +506,18 @@ public class TestDatanodeUpgradeToScmHA {
 
   /// CLUSTER OPERATIONS ///
 
-  /**
-   * Starts the datanode with the first layout version, and calls the version
-   * endpoint task to get cluster ID and SCM ID.
-   *
-   * The daemon for the datanode state machine is not started in this test.
-   * This greatly speeds up execution time.
-   * It means we do not have heartbeat functionality or pre-finalize
-   * upgrade actions, but neither of those things are needed for these tests.
-   */
-  public void startPreFinalizedDatanode() throws Exception {
-    // Set layout version.
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
-        tempFolder.toString());
-    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
-        UUID.randomUUID().toString(),
-        HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
-    layoutStorage.initialize();
-
-    // Build and start the datanode.
-    DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
-    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
-    int actualMlv = 
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
-    assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
-        actualMlv);
-    dsm = newDsm;
-
-    callVersionEndpointTask();
-  }
-
-  public void restartDatanode(int expectedMlv, boolean exactMatch)
-      throws Exception {
-    // Stop existing datanode.
-    DatanodeDetails dd = dsm.getDatanodeDetails();
-    dsm.close();
-
-    // Start new datanode with the same configuration.
-    dsm = new DatanodeStateMachine(dd, conf);
-    
StorageVolumeUtil.getHddsVolumesList(dsm.getContainer().getVolumeSet().getVolumesList())
-        .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempFolder.toFile()));
-    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
-    if (exactMatch) {
-      assertEquals(expectedMlv, mlv);
-    } else {
-      assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
-    }
-
-    callVersionEndpointTask();
-  }
-
-  /**
-   * Get the cluster ID and SCM ID from SCM to the datanode.
-   */
-  public void callVersionEndpointTask() throws Exception {
-    try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
-        address, 1000)) {
-      VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
-          dsm.getContainer());
-      esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
-      vet.call();
-    }
-  }
-
-  public String startScmServer() throws Exception {
-    String scmID = UUID.randomUUID().toString();
-    scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
+  private void startScmServer() throws Exception {
+    scmServerImpl = new ScmTestMock(CLUSTER_ID);
     scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
         scmServerImpl, address, 10);
-    return scmID;
   }
 
   /**
    * Updates the SCM ID on the SCM server. Datanode will not be aware of this
-   * until {@link this#callVersionEndpointTask} is called.
+   * until {@link UpgradeTestHelper#callVersionEndpointTask} is called.
    * @return the new scm ID.
    */
-  public String changeScmID() {
+  private String changeScmID() {
     String scmID = UUID.randomUUID().toString();
     scmServerImpl.setScmId(scmID);
     return scmID;
@@ -579,72 +525,10 @@ public class TestDatanodeUpgradeToScmHA {
 
   /// CONTAINER OPERATIONS ///
 
-  public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
-      Pipeline pipeline)  throws Exception {
-    ContainerProtos.ContainerCommandRequestProto readChunkRequest =
-        ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
-    dispatchRequest(readChunkRequest);
-  }
-
-  public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
-      Pipeline pipeline) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-        getWriteChunk(containerID, pipeline);
-    dispatchRequest(writeChunkRequest);
-
-    ContainerProtos.ContainerCommandRequestProto putBlockRequest =
-        ContainerTestHelper.getPutBlockRequest(pipeline,
-            writeChunkRequest.getWriteChunk());
-    dispatchRequest(putBlockRequest);
-
-    return writeChunkRequest.getWriteChunk();
-  }
-
-  public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
-      long containerID, Pipeline pipeline) throws Exception {
-    return ContainerTestHelper.getWriteChunkRequest(pipeline,
-            ContainerTestHelper.getTestBlockID(containerID), 100);
-  }
-
-  public Pipeline getPipeline() {
-    return MockPipeline.createPipeline(
-        Collections.singletonList(dsm.getDatanodeDetails()));
-  }
-
-  public long addContainer(Pipeline pipeline)
-      throws Exception {
-    long containerID = random.nextInt(Integer.MAX_VALUE);
-    ContainerProtos.ContainerCommandRequestProto createContainerRequest =
-        ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
-    dispatchRequest(createContainerRequest);
-
-    return containerID;
-  }
-
-  public void deleteContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
-        ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
-    dispatchRequest(deleteContainerRequest);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline)
-      throws Exception {
-    closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void closeContainer(long containerID, Pipeline pipeline,
-      ContainerProtos.Result expectedResult) throws Exception {
-    ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
-        ContainerTestHelper.getCloseContainer(pipeline, containerID);
-    dispatchRequest(closeContainerRequest, expectedResult);
-  }
-
   /**
    * Exports the specified container to a temporary file and returns the file.
    */
-  public File exportContainer(long containerId) throws Exception {
+  private File exportContainer(long containerId) throws Exception {
     final ContainerReplicationSource replicationSource =
         new OnDemandContainerReplicationSource(
             dsm.getContainer().getController());
@@ -663,7 +547,7 @@ public class TestDatanodeUpgradeToScmHA {
    * Imports the container found in {@code source} to the datanode with the ID
    * {@code containerID}.
    */
-  public void importContainer(long containerID, File source) throws Exception {
+  private void importContainer(long containerID, File source) throws Exception 
{
     ContainerImporter replicator =
         new ContainerImporter(dsm.getConf(),
             dsm.getContainer().getContainerSet(),
@@ -679,43 +563,8 @@ public class TestDatanodeUpgradeToScmHA {
         NO_COMPRESSION);
   }
 
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request) {
-    dispatchRequest(request, ContainerProtos.Result.SUCCESS);
-  }
-
-  public void dispatchRequest(
-      ContainerProtos.ContainerCommandRequestProto request,
-      ContainerProtos.Result expectedResult) {
-    ContainerProtos.ContainerCommandResponseProto response =
-        dsm.getContainer().getDispatcher().dispatch(request, null);
-    assertEquals(expectedResult, response.getResult());
-  }
-
   /// VOLUME OPERATIONS ///
 
-  /**
-   * Append a datanode volume to the existing volumes in the configuration.
-   * @return The root directory for the new volume.
-   */
-  public File addVolume() throws Exception {
-
-    File vol = Files.createDirectory(
-        tempFolder.resolve(UUID.randomUUID().toString())).toFile();
-    String[] existingVolumes =
-        conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
-    List<String> allVolumes = new ArrayList<>();
-    if (existingVolumes != null) {
-      allVolumes.addAll(Arrays.asList(existingVolumes));
-    }
-
-    allVolumes.add(vol.getAbsolutePath());
-    conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
-        allVolumes.toArray(new String[0]));
-
-    return vol;
-  }
-
   /**
    * Renames the specified volume directory so it will appear as failed to
    * the datanode.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
new file mode 100644
index 0000000000..28b9163f3c
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import 
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Helpers for upgrade tests.
+ */
+public final class UpgradeTestHelper {
+  private UpgradeTestHelper() {
+  }
+  private static final Random RANDOM = new Random();
+
+  /**
+   * Starts the datanode with the fore layout version, and calls the version
+   * endpoint task to get cluster ID and SCM ID.
+   *
+   * The daemon for the datanode state machine is not started in this test.
+   * This greatly speeds up execution time.
+   * It means we do not have heartbeat functionality or pre-finalize
+   * upgrade actions, but neither of those things are needed for these tests.
+   */
+  public static DatanodeStateMachine startPreFinalizedDatanode(
+      OzoneConfiguration conf, Path tempFolder,
+      DatanodeStateMachine dsm, InetSocketAddress address,
+      int metadataLayoutVersion)
+      throws Exception {
+    // Set layout version.
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
+    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+        UUID.randomUUID().toString(),
+        metadataLayoutVersion);
+    layoutStorage.initialize();
+    if (dsm != null) {
+      dsm.close();
+    }
+
+    // Build and start the datanode.
+    DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
+    dsm = new DatanodeStateMachine(dd, conf);
+    int actualMlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    assertEquals(
+        metadataLayoutVersion,
+        actualMlv);
+
+
+    callVersionEndpointTask(conf, dsm.getContainer(), address);
+    return dsm;
+  }
+
+  public static DatanodeStateMachine restartDatanode(
+      OzoneConfiguration conf, DatanodeStateMachine dsm, boolean 
shouldSetDbParentDir,
+      Path tempFolder, InetSocketAddress address, int expectedMlv, boolean 
exactMatch)
+      throws Exception {
+    // Stop existing datanode.
+    DatanodeDetails dd = dsm.getDatanodeDetails();
+    dsm.close();
+
+    // Start new datanode with the same configuration.
+    dsm = new DatanodeStateMachine(dd, conf);
+    if (shouldSetDbParentDir) {
+      
StorageVolumeUtil.getHddsVolumesList(dsm.getContainer().getVolumeSet().getVolumesList())
+          .forEach(hddsVolume -> 
hddsVolume.setDbParentDir(tempFolder.toFile()));
+    }
+    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    if (exactMatch) {
+      assertEquals(expectedMlv, mlv);
+    } else {
+      assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
+    }
+
+    callVersionEndpointTask(conf, dsm.getContainer(), address);
+    return dsm;
+  }
+
+  /**
+   * Get the cluster ID and SCM ID from SCM to the datanode.
+   */
+  public static void callVersionEndpointTask(
+      OzoneConfiguration conf, OzoneContainer container, InetSocketAddress 
address)
+      throws Exception {
+    try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
+        address, 1000)) {
+      VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
+          container);
+      esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      vet.call();
+    }
+  }
+
+  /**
+   * Append a datanode volume to the existing volumes in the configuration.
+   * @return The root directory for the new volume.
+   */
+  public static File addHddsVolume(OzoneConfiguration conf, Path tempFolder) 
throws IOException {
+
+    File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
+        .toString())).toFile();
+    String[] existingVolumes =
+        conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
+    List<String> allVolumes = new ArrayList<>();
+    if (existingVolumes != null) {
+      allVolumes.addAll(Arrays.asList(existingVolumes));
+    }
+
+    allVolumes.add(vol.getAbsolutePath());
+    conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        allVolumes.toArray(new String[0]));
+
+    return vol;
+  }
+
+  /**
+   * Append a db volume to the existing volumes in the configuration.
+   * @return The root directory for the new volume.
+   */
+  public static File addDbVolume(OzoneConfiguration conf, Path tempFolder) 
throws Exception {
+    File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
+        .toString())).toFile();
+    String[] existingVolumes =
+        conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
+    List<String> allVolumes = new ArrayList<>();
+    if (existingVolumes != null) {
+      allVolumes.addAll(Arrays.asList(existingVolumes));
+    }
+
+    allVolumes.add(vol.getAbsolutePath());
+    conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        allVolumes.toArray(new String[0]));
+
+    return vol;
+  }
+
+
+  public static void dispatchRequest(
+      ContainerDispatcher dispatcher,
+      ContainerProtos.ContainerCommandRequestProto request) {
+    dispatchRequest(dispatcher, request, ContainerProtos.Result.SUCCESS);
+  }
+
+  public static void dispatchRequest(
+      ContainerDispatcher dispatcher, 
ContainerProtos.ContainerCommandRequestProto request,
+      ContainerProtos.Result expectedResult) {
+    ContainerProtos.ContainerCommandResponseProto response =
+        dispatcher.dispatch(request, null);
+    assertEquals(expectedResult, response.getResult());
+  }
+
+  public static void readChunk(
+      ContainerDispatcher dispatcher, ContainerProtos.WriteChunkRequestProto 
writeChunk,
+      Pipeline pipeline) throws Exception {
+    ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+        ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
+    dispatchRequest(dispatcher, readChunkRequest);
+  }
+
+  public static ContainerProtos.WriteChunkRequestProto putBlock(
+      ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+      boolean incremental) throws Exception {
+    return putBlock(dispatcher, containerID, pipeline, incremental, 
ContainerProtos.Result.SUCCESS);
+  }
+
+  public static ContainerProtos.WriteChunkRequestProto putBlock(
+      ContainerDispatcher dispatcher, long containerID, Pipeline pipeline) 
throws Exception {
+    return putBlock(dispatcher, containerID, pipeline, false, 
ContainerProtos.Result.SUCCESS);
+  }
+
+  public static ContainerProtos.WriteChunkRequestProto putBlock(
+      ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+      boolean incremental, ContainerProtos.Result expectedResult) throws 
Exception {
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        ContainerTestHelper.getWriteChunkRequest(pipeline,
+            ContainerTestHelper.getTestBlockID(containerID), 100);
+    dispatchRequest(dispatcher, writeChunkRequest);
+
+    ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+        ContainerTestHelper.getPutBlockRequest(pipeline,
+            writeChunkRequest.getWriteChunk(), incremental);
+    dispatchRequest(dispatcher, putBlockRequest, expectedResult);
+    return writeChunkRequest.getWriteChunk();
+  }
+
+  public static long addContainer(ContainerDispatcher dispatcher, Pipeline 
pipeline)
+      throws Exception {
+    long containerID = RANDOM.nextInt(Integer.MAX_VALUE);
+    ContainerProtos.ContainerCommandRequestProto createContainerRequest =
+        ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
+    dispatchRequest(dispatcher, createContainerRequest);
+
+    return containerID;
+  }
+
+  public static void deleteContainer(ContainerDispatcher dispatcher, long 
containerID, Pipeline pipeline)
+      throws Exception {
+    ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
+        ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
+    dispatchRequest(dispatcher, deleteContainerRequest);
+  }
+
+  public static void closeContainer(ContainerDispatcher dispatcher, long 
containerID, Pipeline pipeline)
+      throws Exception {
+    closeContainer(dispatcher, containerID, pipeline, 
ContainerProtos.Result.SUCCESS);
+  }
+
+  public static void closeContainer(
+      ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+      ContainerProtos.Result expectedResult) throws Exception {
+    ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
+        ContainerTestHelper.getCloseContainer(pipeline, containerID);
+    dispatchRequest(dispatcher, closeContainerRequest, expectedResult);
+  }
+
+  public static void finalizeBlock(
+      ContainerDispatcher dispatcher, long containerID, long localID, 
ContainerProtos.Result expectedResult) {
+    ContainerInfo container = mock(ContainerInfo.class);
+    when(container.getContainerID()).thenReturn(containerID);
+
+    ContainerProtos.ContainerCommandRequestProto finalizeBlockRequest =
+        ContainerTestHelper.getFinalizeBlockRequest(localID, container, 
UUID.randomUUID().toString());
+
+    UpgradeTestHelper.dispatchRequest(dispatcher, finalizeBlockRequest, 
expectedResult);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to