This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 593337bad9 HDDS-11282. Combine test setup for datanode upgrade unit
tests (#7063)
593337bad9 is described below
commit 593337bad94bb28b1495539fd0d9cd31d8a3361c
Author: Chung En Lee <[email protected]>
AuthorDate: Thu Aug 15 17:26:04 2024 +0800
HDDS-11282. Combine test setup for datanode upgrade unit tests (#7063)
---
.../hadoop/ozone/container/common/ScmTestMock.java | 8 +-
.../upgrade/TestDatanodeUpgradeToHBaseSupport.java | 242 ++-----------
.../upgrade/TestDatanodeUpgradeToSchemaV3.java | 382 +++++----------------
.../upgrade/TestDatanodeUpgradeToScmHA.java | 331 +++++-------------
.../ozone/container/upgrade/UpgradeTestHelper.java | 271 +++++++++++++++
5 files changed, 495 insertions(+), 739 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 5cb698482a..c333ba1b69 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -74,9 +74,9 @@ public class ScmTestMock implements
StorageContainerDatanodeProtocol {
scmId = UUID.randomUUID().toString();
}
- public ScmTestMock(String clusterId, String scmId) {
+ public ScmTestMock(String clusterId) {
this.clusterId = clusterId;
- this.scmId = scmId;
+ this.scmId = UUID.randomUUID().toString();
}
// Map of datanode to containers
@@ -368,6 +368,10 @@ public class ScmTestMock implements
StorageContainerDatanodeProtocol {
this.clusterId = clusterId;
}
+ public String getScmId() {
+ return scmId;
+ }
+
public void setScmId(String scmId) {
this.scmId = scmId;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
index 37bea49b34..55396446d5 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToHBaseSupport.java
@@ -20,44 +20,28 @@ package org.apache.hadoop.ozone.container.upgrade;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.ScmTestMock;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Tests upgrading a single datanode from HADOOP_PRC_PORTS_IN_DATANODEDETAILS
to HBASE_SUPPORT.
@@ -67,22 +51,19 @@ public class TestDatanodeUpgradeToHBaseSupport {
private Path tempFolder;
private DatanodeStateMachine dsm;
+ private ContainerDispatcher dispatcher;
private OzoneConfiguration conf;
private static final String CLUSTER_ID = "clusterID";
private RPC.Server scmRpcServer;
private InetSocketAddress address;
- private Random random;
-
private void initTests() throws Exception {
conf = new OzoneConfiguration();
setup();
}
private void setup() throws Exception {
- random = new Random();
-
address = SCMTestUtils.getReuseableAddress();
conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
@@ -107,26 +88,30 @@ public class TestDatanodeUpgradeToHBaseSupport {
public void testIncrementalChunkListBeforeAndAfterUpgrade() throws Exception
{
initTests();
// start DN and SCM
- startScmServer();
- addHddsVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Add data to read.
- final long containerID = addContainer(pipeline);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
// incremental chunk list should be rejected before finalizing.
- putBlock(containerID, pipeline, true,
ContainerProtos.Result.UNSUPPORTED_REQUEST);
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline, true,
ContainerProtos.Result.UNSUPPORTED_REQUEST);
Container<?> container =
dsm.getContainer().getContainerSet().getContainer(containerID);
assertEquals(OPEN, container.getContainerData().getState());
// close container to allow upgrade.
- closeContainer(containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
dsm.finalizeUpgrade();
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
// open a new container after finalization
- final long containerID2 = addContainer(pipeline);
+ final long containerID2 = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
// incremental chunk list should work after finalizing.
- putBlock(containerID2, pipeline, true);
+ UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline, true);
Container<?> container2 =
dsm.getContainer().getContainerSet().getContainer(containerID2);
assertEquals(OPEN, container2.getContainerData().getState());
}
@@ -138,195 +123,36 @@ public class TestDatanodeUpgradeToHBaseSupport {
public void testBlockFinalizationBeforeAndAfterUpgrade() throws Exception {
initTests();
// start DN and SCM
- startScmServer();
- addHddsVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Add data to read.
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
pipeline, false);
- finalizeBlock(containerID, writeChunk.getBlockID().getLocalID(),
ContainerProtos.Result.UNSUPPORTED_REQUEST);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline, false);
+ UpgradeTestHelper.finalizeBlock(
+ dispatcher, containerID, writeChunk.getBlockID().getLocalID(),
ContainerProtos.Result.UNSUPPORTED_REQUEST);
Container<?> container =
dsm.getContainer().getContainerSet().getContainer(containerID);
assertEquals(OPEN, container.getContainerData().getState());
// close container to allow upgrade.
- closeContainer(containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
dsm.finalizeUpgrade();
assertTrue(dsm.getLayoutVersionManager().isAllowed(HDDSLayoutFeature.HBASE_SUPPORT));
- final long containerID2 = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk2 =
putBlock(containerID2, pipeline, false);
+ final long containerID2 = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk2 =
+ UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline, false);
// Make sure we can read after finalizing too.
- finalizeBlock(containerID2, writeChunk2.getBlockID().getLocalID(),
ContainerProtos.Result.SUCCESS);
+ UpgradeTestHelper.finalizeBlock(
+ dispatcher, containerID2, writeChunk2.getBlockID().getLocalID(),
ContainerProtos.Result.SUCCESS);
Container<?> container2 =
dsm.getContainer().getContainerSet().getContainer(containerID2);
assertEquals(OPEN, container2.getContainerData().getState());
}
- /**
- * Starts the datanode with the fore layout version, and calls the version
- * endpoint task to get cluster ID and SCM ID.
- *
- * The daemon for the datanode state machine is not started in this test.
- * This greatly speeds up execution time.
- * It means we do not have heartbeat functionality or pre-finalize
- * upgrade actions, but neither of those things are needed for these tests.
- */
- public void startPreFinalizedDatanode() throws Exception {
- // Set layout version.
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
- DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
- UUID.randomUUID().toString(),
- HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion());
- layoutStorage.initialize();
-
- // Build and start the datanode.
- DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
- DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
- int actualMlv =
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
- assertEquals(
- HDDSLayoutFeature.HADOOP_PRC_PORTS_IN_DATANODEDETAILS.layoutVersion(),
- actualMlv);
- if (dsm != null) {
- dsm.close();
- }
- dsm = newDsm;
-
- callVersionEndpointTask();
- }
-
- /**
- * Get the cluster ID and SCM ID from SCM to the datanode.
- */
- public void callVersionEndpointTask() throws Exception {
- try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
- address, 1000)) {
- VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
- dsm.getContainer());
- esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
- vet.call();
- }
- }
-
- public String startScmServer() throws IOException {
- String scmID = UUID.randomUUID().toString();
- ScmTestMock scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
- scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
- scmServerImpl, address, 10);
- return scmID;
- }
-
- /// CONTAINER OPERATIONS ///
- public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
- Pipeline pipeline) throws Exception {
- ContainerProtos.ContainerCommandRequestProto readChunkRequest =
- ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
- dispatchRequest(readChunkRequest);
- }
-
- public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
- Pipeline pipeline, boolean incremental) throws Exception {
- return putBlock(containerID, pipeline, incremental,
ContainerProtos.Result.SUCCESS);
- }
-
- public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
- Pipeline pipeline, boolean incremental, ContainerProtos.Result
expectedResult) throws Exception {
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- getWriteChunk(containerID, pipeline);
- dispatchRequest(writeChunkRequest);
-
- ContainerProtos.ContainerCommandRequestProto putBlockRequest =
- ContainerTestHelper.getPutBlockRequest(pipeline,
- writeChunkRequest.getWriteChunk(), incremental);
- dispatchRequest(putBlockRequest, expectedResult);
-
- return writeChunkRequest.getWriteChunk();
- }
-
- public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
- long containerID, Pipeline pipeline) throws Exception {
- return ContainerTestHelper.getWriteChunkRequest(pipeline,
- ContainerTestHelper.getTestBlockID(containerID), 100);
- }
-
- public Pipeline getPipeline() {
- return MockPipeline.createPipeline(
- Collections.singletonList(dsm.getDatanodeDetails()));
- }
-
- public long addContainer(Pipeline pipeline)
- throws Exception {
- long containerID = random.nextInt(Integer.MAX_VALUE);
- ContainerProtos.ContainerCommandRequestProto createContainerRequest =
- ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
- dispatchRequest(createContainerRequest);
-
- return containerID;
- }
-
- public void deleteContainer(long containerID, Pipeline pipeline)
- throws Exception {
- ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
- ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
- dispatchRequest(deleteContainerRequest);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline)
- throws Exception {
- closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline,
- ContainerProtos.Result expectedResult) throws Exception {
- ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
- ContainerTestHelper.getCloseContainer(pipeline, containerID);
- dispatchRequest(closeContainerRequest, expectedResult);
- }
-
- public void finalizeBlock(long containerID, long localID,
ContainerProtos.Result expectedResult) {
- ContainerInfo container = mock(ContainerInfo.class);
- when(container.getContainerID()).thenReturn(containerID);
-
- ContainerProtos.ContainerCommandRequestProto finalizeBlockRequest =
- ContainerTestHelper.getFinalizeBlockRequest(localID, container,
UUID.randomUUID().toString());
-
- dispatchRequest(finalizeBlockRequest, expectedResult);
- }
-
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request) {
- dispatchRequest(request, ContainerProtos.Result.SUCCESS);
- }
-
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request,
- ContainerProtos.Result expectedResult) {
- ContainerProtos.ContainerCommandResponseProto response =
- dsm.getContainer().getDispatcher().dispatch(request, null);
- assertEquals(expectedResult, response.getResult());
- }
-
- /// VOLUME OPERATIONS ///
-
- /**
- * Append a datanode volume to the existing volumes in the configuration.
- * @return The root directory for the new volume.
- */
- public File addHddsVolume() throws IOException {
-
- File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
- .toString())).toFile();
- String[] existingVolumes =
- conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
- List<String> allVolumes = new ArrayList<>();
- if (existingVolumes != null) {
- allVolumes.addAll(Arrays.asList(existingVolumes));
- }
-
- allVolumes.add(vol.getAbsolutePath());
- conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
- allVolumes.toArray(new String[0]));
-
- return vol;
- }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
index 23b7da2634..fc599f7f91 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.upgrade;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -29,21 +28,17 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.ScmTestMock;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.DbVolume;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -52,15 +47,10 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -91,8 +81,6 @@ public class TestDatanodeUpgradeToSchemaV3 {
private RPC.Server scmRpcServer;
private InetSocketAddress address;
- private Random random;
-
private void initTests(Boolean enable) throws Exception {
boolean schemaV3Enabled = enable;
conf = new OzoneConfiguration();
@@ -106,8 +94,6 @@ public class TestDatanodeUpgradeToSchemaV3 {
}
private void setup() throws Exception {
- random = new Random();
-
address = SCMTestUtils.getReuseableAddress();
conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
@@ -135,10 +121,12 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testDBOnHddsVolume(boolean schemaV3Enabled) throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
.getVolumesList().get(0);
assertNull(dataVolume.getDbVolume());
@@ -170,11 +158,13 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testDBOnDbVolume(boolean schemaV3Enabled) throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
- addDbVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ UpgradeTestHelper.addDbVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
.getVolumesList().get(0);
assertNull(dataVolume.getDbParentDir());
@@ -209,9 +199,10 @@ public class TestDatanodeUpgradeToSchemaV3 {
throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
// add one HddsVolume
- addHddsVolume();
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
// Set layout version.
DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
@@ -232,7 +223,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
assertNull(dataVolume.getDbParentDir());
// Restart DN and finalize upgrade
- restartDatanode(
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
dsm.finalizeUpgrade();
@@ -255,13 +246,15 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testFinalizeTwice(boolean schemaV3Enabled) throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
// add one HddsVolume and two DbVolume
- addHddsVolume();
- addDbVolume();
- addDbVolume();
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ UpgradeTestHelper.addDbVolume(conf, tempFolder);
+ UpgradeTestHelper.addDbVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
dsm.finalizeUpgrade();
DbVolume dbVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
@@ -283,15 +276,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
dsm.finalizeUpgrade();
// Add a new HddsVolume. It should have DB created after DN restart.
- addHddsVolume();
- restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
+ HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
false);
for (StorageVolume vol:
dsm.getContainer().getVolumeSet().getVolumesList()) {
@@ -314,10 +310,12 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testAddDbVolumeAfterFinalize(boolean schemaV3Enabled)
throws Exception {
initTests(schemaV3Enabled);
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
HddsVolume hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
.getVolumesList().get(0);
assertNull(hddsVolume.getDbParentDir());
@@ -328,8 +326,9 @@ public class TestDatanodeUpgradeToSchemaV3 {
hddsVolume.getStorageDir().getAbsolutePath()));
// Add a new DbVolume
- addDbVolume();
- restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+ UpgradeTestHelper.addDbVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
+ HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
false);
// HddsVolume should still use the rocksDB under it's volume
@@ -354,15 +353,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
dsm.finalizeUpgrade();
- addDbVolume();
- File newDataVolume = addHddsVolume();
- restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+ UpgradeTestHelper.addDbVolume(conf, tempFolder);
+ File newDataVolume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
+ HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
false);
DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
@@ -419,18 +421,22 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testWrite(boolean enable, String expectedVersion)
throws Exception {
// start DN and SCM
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
// Disable Schema V3
conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED, false);
- startPreFinalizedDatanode();
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+ ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
dsm.finalizeUpgrade();
- final Pipeline pipeline = getPipeline();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Create a container to write data.
- final long containerID1 = addContainer(pipeline);
- putBlock(containerID1, pipeline);
- closeContainer(containerID1, pipeline);
+ final long containerID1 = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, containerID1, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID1, pipeline);
KeyValueContainer container = (KeyValueContainer)
dsm.getContainer().getContainerSet().getContainer(containerID1);
// When SchemaV3 is disabled, new data should be saved as SchemaV2.
@@ -440,13 +446,15 @@ public class TestDatanodeUpgradeToSchemaV3 {
// Set SchemaV3 enable status
conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
enable);
- restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
+ HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(),
false);
+ dispatcher = dsm.getContainer().getDispatcher();
// Write new data
- final long containerID2 = addContainer(pipeline);
- putBlock(containerID2, pipeline);
- closeContainer(containerID2, pipeline);
+ final long containerID2 = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, containerID2, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID2, pipeline);
container = (KeyValueContainer)
dsm.getContainer().getContainerSet().getContainer(containerID2);
// If SchemaV3 is enabled, new data should be saved as SchemaV3
@@ -464,16 +472,20 @@ public class TestDatanodeUpgradeToSchemaV3 {
throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+ ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Add data to read.
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
- pipeline);
- closeContainer(containerID, pipeline);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
// Create thread to keep reading during finalization.
ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -481,10 +493,10 @@ public class TestDatanodeUpgradeToSchemaV3 {
// Layout version check should be thread safe.
while (!dsm.getLayoutVersionManager()
.isAllowed(HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
}
// Make sure we can read after finalizing too.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
return null;
});
@@ -502,8 +514,9 @@ public class TestDatanodeUpgradeToSchemaV3 {
public void testFinalizeFailure(boolean schemaV3Enabled) throws Exception {
initTests(schemaV3Enabled);
// start DN and SCM
- startScmServer();
- addHddsVolume();
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ new ScmTestMock(CLUSTER_ID), address, 10);
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
// Let HddsVolume be formatted to mimic the real cluster upgrade
// Set layout version.
DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
@@ -523,15 +536,17 @@ public class TestDatanodeUpgradeToSchemaV3 {
assertNull(dataVolume.getDbParentDir());
// Restart DN
- restartDatanode(
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+ ContainerDispatcher dispatcher = dsm.getContainer().getDispatcher();
// Write some data.
- final Pipeline pipeline = getPipeline();
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
- pipeline);
- closeContainer(containerID, pipeline);
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
KeyValueContainer container = (KeyValueContainer)
dsm.getContainer().getContainerSet().getContainer(containerID);
assertEquals(OzoneConsts.SCHEMA_V2,
@@ -558,227 +573,18 @@ public class TestDatanodeUpgradeToSchemaV3 {
dsm.getContainer().getContainerSet().getContainer(containerID);
assertEquals(OzoneConsts.SCHEMA_V2,
container.getContainerData().getSchemaVersion());
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
// SchemaV3 is not finalized, so still ERASURE_CODED_STORAGE_SUPPORT
- restartDatanode(
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, false, tempFolder,
address,
HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+ dispatcher = dsm.getContainer().getDispatcher();
// Old data is readable after DN restart
container = (KeyValueContainer)
dsm.getContainer().getContainerSet().getContainer(containerID);
assertEquals(OzoneConsts.SCHEMA_V2,
container.getContainerData().getSchemaVersion());
- readChunk(writeChunk, pipeline);
- }
-
- public void checkContainerPathID(long containerID, String expectedID) {
- KeyValueContainerData data =
- (KeyValueContainerData) dsm.getContainer().getContainerSet()
- .getContainer(containerID).getContainerData();
- assertThat(data.getChunksPath()).contains(expectedID);
- assertThat(data.getMetadataPath()).contains(expectedID);
- }
-
- public List<File> getHddsSubdirs(File volume) {
- File[] subdirsArray = getHddsRoot(volume).listFiles(File::isDirectory);
- assertNotNull(subdirsArray);
- return Arrays.asList(subdirsArray);
- }
-
- public File getHddsRoot(File volume) {
- return new File(HddsVolumeUtil.getHddsRoot(volume.getAbsolutePath()));
- }
-
- /**
- * Starts the datanode with the fore layout version, and calls the version
- * endpoint task to get cluster ID and SCM ID.
- *
- * The daemon for the datanode state machine is not started in this test.
- * This greatly speeds up execution time.
- * It means we do not have heartbeat functionality or pre-finalize
- * upgrade actions, but neither of those things are needed for these tests.
- */
- public void startPreFinalizedDatanode() throws Exception {
- // Set layout version.
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
- DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
- UUID.randomUUID().toString(),
- HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
- layoutStorage.initialize();
-
- // Build and start the datanode.
- DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
- DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
- int actualMlv =
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
- assertEquals(
- HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(),
- actualMlv);
- if (dsm != null) {
- dsm.close();
- }
- dsm = newDsm;
-
- callVersionEndpointTask();
- }
-
- public void restartDatanode(int expectedMlv, boolean exactMatch)
- throws Exception {
- // Stop existing datanode.
- DatanodeDetails dd = dsm.getDatanodeDetails();
- dsm.close();
-
- // Start new datanode with the same configuration.
- dsm = new DatanodeStateMachine(dd, conf);
- int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
- if (exactMatch) {
- assertEquals(expectedMlv, mlv);
- } else {
- assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
- }
-
- callVersionEndpointTask();
- }
-
- /**
- * Get the cluster ID and SCM ID from SCM to the datanode.
- */
- public void callVersionEndpointTask() throws Exception {
- try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
- address, 1000)) {
- VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
- dsm.getContainer());
- esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
- vet.call();
- }
- }
-
- public String startScmServer() throws IOException {
- String scmID = UUID.randomUUID().toString();
- ScmTestMock scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
- scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
- scmServerImpl, address, 10);
- return scmID;
- }
-
- /// CONTAINER OPERATIONS ///
- public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
- Pipeline pipeline) throws Exception {
- ContainerProtos.ContainerCommandRequestProto readChunkRequest =
- ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
- dispatchRequest(readChunkRequest);
- }
-
- public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
- Pipeline pipeline) throws Exception {
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- getWriteChunk(containerID, pipeline);
- dispatchRequest(writeChunkRequest);
-
- ContainerProtos.ContainerCommandRequestProto putBlockRequest =
- ContainerTestHelper.getPutBlockRequest(pipeline,
- writeChunkRequest.getWriteChunk());
- dispatchRequest(putBlockRequest);
-
- return writeChunkRequest.getWriteChunk();
- }
-
- public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
- long containerID, Pipeline pipeline) throws Exception {
- return ContainerTestHelper.getWriteChunkRequest(pipeline,
- ContainerTestHelper.getTestBlockID(containerID), 100);
- }
-
- public Pipeline getPipeline() {
- return MockPipeline.createPipeline(
- Collections.singletonList(dsm.getDatanodeDetails()));
- }
-
- public long addContainer(Pipeline pipeline)
- throws Exception {
- long containerID = random.nextInt(Integer.MAX_VALUE);
- ContainerProtos.ContainerCommandRequestProto createContainerRequest =
- ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
- dispatchRequest(createContainerRequest);
-
- return containerID;
- }
-
- public void deleteContainer(long containerID, Pipeline pipeline)
- throws Exception {
- ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
- ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
- dispatchRequest(deleteContainerRequest);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline)
- throws Exception {
- closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline,
- ContainerProtos.Result expectedResult) throws Exception {
- ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
- ContainerTestHelper.getCloseContainer(pipeline, containerID);
- dispatchRequest(closeContainerRequest, expectedResult);
- }
-
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request) {
- dispatchRequest(request, ContainerProtos.Result.SUCCESS);
- }
-
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request,
- ContainerProtos.Result expectedResult) {
- ContainerProtos.ContainerCommandResponseProto response =
- dsm.getContainer().getDispatcher().dispatch(request, null);
- assertEquals(expectedResult, response.getResult());
- }
-
- /// VOLUME OPERATIONS ///
-
- /**
- * Append a datanode volume to the existing volumes in the configuration.
- * @return The root directory for the new volume.
- */
- public File addHddsVolume() throws IOException {
-
- File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
- .toString())).toFile();
- String[] existingVolumes =
- conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
- List<String> allVolumes = new ArrayList<>();
- if (existingVolumes != null) {
- allVolumes.addAll(Arrays.asList(existingVolumes));
- }
-
- allVolumes.add(vol.getAbsolutePath());
- conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
- allVolumes.toArray(new String[0]));
-
- return vol;
- }
-
- /**
- * Append a db volume to the existing volumes in the configuration.
- * @return The root directory for the new volume.
- */
- public File addDbVolume() throws Exception {
- File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
- .toString())).toFile();
- String[] existingVolumes =
- conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
- List<String> allVolumes = new ArrayList<>();
- if (existingVolumes != null) {
- allVolumes.addAll(Arrays.asList(existingVolumes));
- }
-
- allVolumes.add(vol.getAbsolutePath());
- conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
- allVolumes.toArray(new String[0]));
-
- return vol;
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 59b88bcbea..d4a27e74cd 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.ozone.container.upgrade;
-import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -28,17 +26,12 @@ import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
-import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.ScmTestMock;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
-import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.replication.ContainerImporter;
import
org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
@@ -55,11 +48,9 @@ import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -81,6 +72,7 @@ public class TestDatanodeUpgradeToScmHA {
private Path tempFolder;
private DatanodeStateMachine dsm;
+ private ContainerDispatcher dispatcher;
private OzoneConfiguration conf;
private static final String CLUSTER_ID = "clusterID";
private boolean scmHAAlreadyEnabled;
@@ -89,8 +81,6 @@ public class TestDatanodeUpgradeToScmHA {
private InetSocketAddress address;
private ScmTestMock scmServerImpl;
- private Random random;
-
private void setScmHAEnabled(boolean enableSCMHA)
throws Exception {
this.scmHAAlreadyEnabled = enableSCMHA;
@@ -100,8 +90,6 @@ public class TestDatanodeUpgradeToScmHA {
}
private void setup() throws Exception {
- random = new Random();
-
address = SCMTestUtils.getReuseableAddress();
conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
}
@@ -124,15 +112,18 @@ public class TestDatanodeUpgradeToScmHA {
setScmHAEnabled(enableSCMHA);
// start DN and SCM
startScmServer();
- addVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Add data to read.
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
- pipeline);
- closeContainer(containerID, pipeline);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
// Create thread to keep reading during finalization.
ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -140,10 +131,10 @@ public class TestDatanodeUpgradeToScmHA {
// Layout version check should be thread safe.
while (!dsm.getLayoutVersionManager()
.isAllowed(HDDSLayoutFeature.SCM_HA)) {
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
}
// Make sure we can read after finalizing too.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
return null;
});
@@ -159,40 +150,45 @@ public class TestDatanodeUpgradeToScmHA {
setScmHAEnabled(enableSCMHA);
// start DN and SCM
startScmServer();
- addVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
// Pre-export a container to continuously import and delete.
- final long exportContainerID = addContainer(pipeline);
+ final long exportContainerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
ContainerProtos.WriteChunkRequestProto exportWriteChunk =
- putBlock(exportContainerID, pipeline);
- closeContainer(exportContainerID, pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, exportContainerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, exportContainerID, pipeline);
File exportedContainerFile = exportContainer(exportContainerID);
- deleteContainer(exportContainerID, pipeline);
+ UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID, pipeline);
// Export another container to import while pre-finalized and read
// finalized.
- final long exportContainerID2 = addContainer(pipeline);
+ final long exportContainerID2 = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
ContainerProtos.WriteChunkRequestProto exportWriteChunk2 =
- putBlock(exportContainerID2, pipeline);
- closeContainer(exportContainerID2, pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, exportContainerID2, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, exportContainerID2, pipeline);
File exportedContainerFile2 = exportContainer(exportContainerID2);
- deleteContainer(exportContainerID2, pipeline);
+ UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID2,
pipeline);
// Make sure we can import and read a container pre-finalized.
importContainer(exportContainerID2, exportedContainerFile2);
- readChunk(exportWriteChunk2, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
// Now SCM and enough other DNs finalize to enable SCM HA. This DN is
// restarted with SCM HA config and gets a different SCM ID.
conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
changeScmID();
- restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+ dispatcher = dsm.getContainer().getDispatcher();
// Make sure the existing container can be read.
- readChunk(exportWriteChunk2, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
// Create thread to keep importing containers during the upgrade.
// Since the datanode's MLV is behind SCM's, container creation is not
@@ -204,12 +200,12 @@ public class TestDatanodeUpgradeToScmHA {
while (!dsm.getLayoutVersionManager()
.isAllowed(HDDSLayoutFeature.SCM_HA)) {
importContainer(exportContainerID, exportedContainerFile);
- readChunk(exportWriteChunk, pipeline);
- deleteContainer(exportContainerID, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk, pipeline);
+ UpgradeTestHelper.deleteContainer(dispatcher, exportContainerID,
pipeline);
}
// Make sure we can import after finalizing too.
importContainer(exportContainerID, exportedContainerFile);
- readChunk(exportWriteChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk, pipeline);
return null;
});
@@ -220,7 +216,7 @@ public class TestDatanodeUpgradeToScmHA {
// Make sure we can read the container that was imported while
// pre-finalized after finalizing.
- readChunk(exportWriteChunk2, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, exportWriteChunk2, pipeline);
}
@ParameterizedTest(name = "{index}: scmHAAlreadyEnabled={0}")
@@ -230,10 +226,14 @@ public class TestDatanodeUpgradeToScmHA {
setScmHAEnabled(enableSCMHA);
/// SETUP ///
- String originalScmID = startScmServer();
- File volume = addVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ startScmServer();
+ String originalScmID = scmServerImpl.getScmId();
+ File volume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
/// PRE-FINALIZED: Write and Read from formatted volume ///
@@ -243,10 +243,10 @@ public class TestDatanodeUpgradeToScmHA {
dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
// Add container with data, make sure it can be read and written.
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
- pipeline);
- readChunk(writeChunk, pipeline);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
checkPreFinalizedVolumePathID(volume, originalScmID, CLUSTER_ID);
checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
@@ -256,7 +256,7 @@ public class TestDatanodeUpgradeToScmHA {
failVolume(volume);
// Since volume is failed, container should be marked unhealthy.
// Finalization should proceed anyways.
- closeContainer(containerID, pipeline,
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline,
ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR);
State containerState = dsm.getContainer().getContainerSet()
.getContainer(containerID).getContainerState();
@@ -286,11 +286,13 @@ public class TestDatanodeUpgradeToScmHA {
// imported to it.
// This should log a warning about reading from an unhealthy container
// but otherwise proceed successfully.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
/// FINALIZED: Restart datanode to upgrade the failed volume ///
- restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder,
address,
+ HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+ dispatcher = dsm.getContainer().getDispatcher();
assertEquals(1,
dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -301,12 +303,12 @@ public class TestDatanodeUpgradeToScmHA {
checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
// Read container from before upgrade. The upgrade required it to be
closed.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
// Write and read container after upgrade.
- long newContainerID = addContainer(pipeline);
+ long newContainerID = UpgradeTestHelper.addContainer(dispatcher, pipeline);
ContainerProtos.WriteChunkRequestProto newWriteChunk =
- putBlock(newContainerID, pipeline);
- readChunk(newWriteChunk, pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, newContainerID, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, newWriteChunk, pipeline);
// The new container should use cluster ID in its path.
// The volume it is placed on is up to the implementation.
checkContainerPathID(newContainerID, CLUSTER_ID);
@@ -318,10 +320,14 @@ public class TestDatanodeUpgradeToScmHA {
setScmHAEnabled(enableSCMHA);
/// SETUP ///
- String originalScmID = startScmServer();
- File preFinVolume1 = addVolume();
- startPreFinalizedDatanode();
- final Pipeline pipeline = getPipeline();
+ startScmServer();
+ String originalScmID = scmServerImpl.getScmId();
+ File preFinVolume1 = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
+ dsm = UpgradeTestHelper.startPreFinalizedDatanode(conf, tempFolder, dsm,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
+ dispatcher = dsm.getContainer().getDispatcher();
+ final Pipeline pipeline = MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
/// PRE-FINALIZED: Write and Read from formatted volume ///
@@ -331,10 +337,10 @@ public class TestDatanodeUpgradeToScmHA {
dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
// Add container with data, make sure it can be read and written.
- final long containerID = addContainer(pipeline);
- ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
- pipeline);
- readChunk(writeChunk, pipeline);
+ final long containerID = UpgradeTestHelper.addContainer(dispatcher,
pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk =
+ UpgradeTestHelper.putBlock(dispatcher, containerID, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
checkPreFinalizedVolumePathID(preFinVolume1, originalScmID, CLUSTER_ID);
checkContainerPathID(containerID, originalScmID, CLUSTER_ID);
@@ -346,9 +352,11 @@ public class TestDatanodeUpgradeToScmHA {
conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
changeScmID();
// A new volume is added that must be formatted.
- File preFinVolume2 = addVolume();
+ File preFinVolume2 = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
- restartDatanode(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder,
address,
+ HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(), true);
+ dispatcher = dsm.getContainer().getDispatcher();
assertEquals(2,
dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -356,7 +364,7 @@ public class TestDatanodeUpgradeToScmHA {
dsm.getContainer().getVolumeSet().getFailedVolumesList().size());
// Because DN mlv would be behind SCM mlv, only reads are allowed.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
// On restart, there should have been no changes to the paths already used.
checkPreFinalizedVolumePathID(preFinVolume1, originalScmID, CLUSTER_ID);
@@ -369,7 +377,7 @@ public class TestDatanodeUpgradeToScmHA {
/// FINALIZE ///
- closeContainer(containerID, pipeline);
+ UpgradeTestHelper.closeContainer(dispatcher, containerID, pipeline);
dsm.finalizeUpgrade();
LambdaTestUtils.await(2000, 500,
() -> dsm.getLayoutVersionManager()
@@ -379,11 +387,13 @@ public class TestDatanodeUpgradeToScmHA {
// Add a new volume that should be formatted with cluster ID only, since
// DN has finalized.
- File finVolume = addVolume();
+ File finVolume = UpgradeTestHelper.addHddsVolume(conf, tempFolder);
// Yet another SCM ID is received this time, but it should not matter.
changeScmID();
- restartDatanode(HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+ dsm = UpgradeTestHelper.restartDatanode(conf, dsm, true, tempFolder,
address,
+ HDDSLayoutFeature.SCM_HA.layoutVersion(), false);
+ dispatcher = dsm.getContainer().getDispatcher();
assertEquals(3,
dsm.getContainer().getVolumeSet().getVolumesList().size());
@@ -400,12 +410,12 @@ public class TestDatanodeUpgradeToScmHA {
/// FINALIZED: Read old data and write + read new data ///
// Read container from before upgrade. The upgrade required it to be
closed.
- readChunk(writeChunk, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, writeChunk, pipeline);
// Write and read container after upgrade.
- long newContainerID = addContainer(pipeline);
+ long newContainerID = UpgradeTestHelper.addContainer(dispatcher, pipeline);
ContainerProtos.WriteChunkRequestProto newWriteChunk =
- putBlock(newContainerID, pipeline);
- readChunk(newWriteChunk, pipeline);
+ UpgradeTestHelper.putBlock(dispatcher, newContainerID, pipeline);
+ UpgradeTestHelper.readChunk(dispatcher, newWriteChunk, pipeline);
// The new container should use cluster ID in its path.
// The volume it is placed on is up to the implementation.
checkContainerPathID(newContainerID, CLUSTER_ID);
@@ -496,82 +506,18 @@ public class TestDatanodeUpgradeToScmHA {
/// CLUSTER OPERATIONS ///
- /**
- * Starts the datanode with the first layout version, and calls the version
- * endpoint task to get cluster ID and SCM ID.
- *
- * The daemon for the datanode state machine is not started in this test.
- * This greatly speeds up execution time.
- * It means we do not have heartbeat functionality or pre-finalize
- * upgrade actions, but neither of those things are needed for these tests.
- */
- public void startPreFinalizedDatanode() throws Exception {
- // Set layout version.
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
- tempFolder.toString());
- DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
- UUID.randomUUID().toString(),
- HDDSLayoutFeature.INITIAL_VERSION.layoutVersion());
- layoutStorage.initialize();
-
- // Build and start the datanode.
- DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
- DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
- int actualMlv =
newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
- assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
- actualMlv);
- dsm = newDsm;
-
- callVersionEndpointTask();
- }
-
- public void restartDatanode(int expectedMlv, boolean exactMatch)
- throws Exception {
- // Stop existing datanode.
- DatanodeDetails dd = dsm.getDatanodeDetails();
- dsm.close();
-
- // Start new datanode with the same configuration.
- dsm = new DatanodeStateMachine(dd, conf);
-
StorageVolumeUtil.getHddsVolumesList(dsm.getContainer().getVolumeSet().getVolumesList())
- .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempFolder.toFile()));
- int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
- if (exactMatch) {
- assertEquals(expectedMlv, mlv);
- } else {
- assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
- }
-
- callVersionEndpointTask();
- }
-
- /**
- * Get the cluster ID and SCM ID from SCM to the datanode.
- */
- public void callVersionEndpointTask() throws Exception {
- try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
- address, 1000)) {
- VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
- dsm.getContainer());
- esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
- vet.call();
- }
- }
-
- public String startScmServer() throws Exception {
- String scmID = UUID.randomUUID().toString();
- scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
+ private void startScmServer() throws Exception {
+ scmServerImpl = new ScmTestMock(CLUSTER_ID);
scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
scmServerImpl, address, 10);
- return scmID;
}
/**
* Updates the SCM ID on the SCM server. Datanode will not be aware of this
- * until {@link this#callVersionEndpointTask} is called.
+ * until {@link UpgradeTestHelper#callVersionEndpointTask} is called.
* @return the new scm ID.
*/
- public String changeScmID() {
+ private String changeScmID() {
String scmID = UUID.randomUUID().toString();
scmServerImpl.setScmId(scmID);
return scmID;
@@ -579,72 +525,10 @@ public class TestDatanodeUpgradeToScmHA {
/// CONTAINER OPERATIONS ///
- public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
- Pipeline pipeline) throws Exception {
- ContainerProtos.ContainerCommandRequestProto readChunkRequest =
- ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
-
- dispatchRequest(readChunkRequest);
- }
-
- public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
- Pipeline pipeline) throws Exception {
- ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
- getWriteChunk(containerID, pipeline);
- dispatchRequest(writeChunkRequest);
-
- ContainerProtos.ContainerCommandRequestProto putBlockRequest =
- ContainerTestHelper.getPutBlockRequest(pipeline,
- writeChunkRequest.getWriteChunk());
- dispatchRequest(putBlockRequest);
-
- return writeChunkRequest.getWriteChunk();
- }
-
- public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
- long containerID, Pipeline pipeline) throws Exception {
- return ContainerTestHelper.getWriteChunkRequest(pipeline,
- ContainerTestHelper.getTestBlockID(containerID), 100);
- }
-
- public Pipeline getPipeline() {
- return MockPipeline.createPipeline(
- Collections.singletonList(dsm.getDatanodeDetails()));
- }
-
- public long addContainer(Pipeline pipeline)
- throws Exception {
- long containerID = random.nextInt(Integer.MAX_VALUE);
- ContainerProtos.ContainerCommandRequestProto createContainerRequest =
- ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
- dispatchRequest(createContainerRequest);
-
- return containerID;
- }
-
- public void deleteContainer(long containerID, Pipeline pipeline)
- throws Exception {
- ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
- ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
- dispatchRequest(deleteContainerRequest);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline)
- throws Exception {
- closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
- }
-
- public void closeContainer(long containerID, Pipeline pipeline,
- ContainerProtos.Result expectedResult) throws Exception {
- ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
- ContainerTestHelper.getCloseContainer(pipeline, containerID);
- dispatchRequest(closeContainerRequest, expectedResult);
- }
-
/**
* Exports the specified container to a temporary file and returns the file.
*/
- public File exportContainer(long containerId) throws Exception {
+ private File exportContainer(long containerId) throws Exception {
final ContainerReplicationSource replicationSource =
new OnDemandContainerReplicationSource(
dsm.getContainer().getController());
@@ -663,7 +547,7 @@ public class TestDatanodeUpgradeToScmHA {
* Imports the container found in {@code source} to the datanode with the ID
* {@code containerID}.
*/
- public void importContainer(long containerID, File source) throws Exception {
+ private void importContainer(long containerID, File source) throws Exception
{
ContainerImporter replicator =
new ContainerImporter(dsm.getConf(),
dsm.getContainer().getContainerSet(),
@@ -679,43 +563,8 @@ public class TestDatanodeUpgradeToScmHA {
NO_COMPRESSION);
}
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request) {
- dispatchRequest(request, ContainerProtos.Result.SUCCESS);
- }
-
- public void dispatchRequest(
- ContainerProtos.ContainerCommandRequestProto request,
- ContainerProtos.Result expectedResult) {
- ContainerProtos.ContainerCommandResponseProto response =
- dsm.getContainer().getDispatcher().dispatch(request, null);
- assertEquals(expectedResult, response.getResult());
- }
-
/// VOLUME OPERATIONS ///
- /**
- * Append a datanode volume to the existing volumes in the configuration.
- * @return The root directory for the new volume.
- */
- public File addVolume() throws Exception {
-
- File vol = Files.createDirectory(
- tempFolder.resolve(UUID.randomUUID().toString())).toFile();
- String[] existingVolumes =
- conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
- List<String> allVolumes = new ArrayList<>();
- if (existingVolumes != null) {
- allVolumes.addAll(Arrays.asList(existingVolumes));
- }
-
- allVolumes.add(vol.getAbsolutePath());
- conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
- allVolumes.toArray(new String[0]));
-
- return vol;
- }
-
/**
* Renames the specified volume directory so it will appear as failed to
* the datanode.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
new file mode 100644
index 0000000000..28b9163f3c
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/UpgradeTestHelper.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Helpers for upgrade tests.
+ */
+public final class UpgradeTestHelper {
+ private UpgradeTestHelper() {
+ }
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Starts the datanode with the fore layout version, and calls the version
+ * endpoint task to get cluster ID and SCM ID.
+ *
+ * The daemon for the datanode state machine is not started in this test.
+ * This greatly speeds up execution time.
+ * It means we do not have heartbeat functionality or pre-finalize
+ * upgrade actions, but neither of those things are needed for these tests.
+ */
+ public static DatanodeStateMachine startPreFinalizedDatanode(
+ OzoneConfiguration conf, Path tempFolder,
+ DatanodeStateMachine dsm, InetSocketAddress address,
+ int metadataLayoutVersion)
+ throws Exception {
+ // Set layout version.
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, tempFolder.toString());
+ DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+ UUID.randomUUID().toString(),
+ metadataLayoutVersion);
+ layoutStorage.initialize();
+ if (dsm != null) {
+ dsm.close();
+ }
+
+ // Build and start the datanode.
+ DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
+ dsm = new DatanodeStateMachine(dd, conf);
+ int actualMlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+ assertEquals(
+ metadataLayoutVersion,
+ actualMlv);
+
+
+ callVersionEndpointTask(conf, dsm.getContainer(), address);
+ return dsm;
+ }
+
+ public static DatanodeStateMachine restartDatanode(
+ OzoneConfiguration conf, DatanodeStateMachine dsm, boolean
shouldSetDbParentDir,
+ Path tempFolder, InetSocketAddress address, int expectedMlv, boolean
exactMatch)
+ throws Exception {
+ // Stop existing datanode.
+ DatanodeDetails dd = dsm.getDatanodeDetails();
+ dsm.close();
+
+ // Start new datanode with the same configuration.
+ dsm = new DatanodeStateMachine(dd, conf);
+ if (shouldSetDbParentDir) {
+
StorageVolumeUtil.getHddsVolumesList(dsm.getContainer().getVolumeSet().getVolumesList())
+ .forEach(hddsVolume ->
hddsVolume.setDbParentDir(tempFolder.toFile()));
+ }
+ int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+ if (exactMatch) {
+ assertEquals(expectedMlv, mlv);
+ } else {
+ assertThat(expectedMlv).isLessThanOrEqualTo(mlv);
+ }
+
+ callVersionEndpointTask(conf, dsm.getContainer(), address);
+ return dsm;
+ }
+
+ /**
+ * Get the cluster ID and SCM ID from SCM to the datanode.
+ */
+ public static void callVersionEndpointTask(
+ OzoneConfiguration conf, OzoneContainer container, InetSocketAddress
address)
+ throws Exception {
+ try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
+ address, 1000)) {
+ VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
+ container);
+ esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+ vet.call();
+ }
+ }
+
+ /**
+ * Append a datanode volume to the existing volumes in the configuration.
+ * @return The root directory for the new volume.
+ */
+ public static File addHddsVolume(OzoneConfiguration conf, Path tempFolder)
throws IOException {
+
+ File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
+ .toString())).toFile();
+ String[] existingVolumes =
+ conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
+ List<String> allVolumes = new ArrayList<>();
+ if (existingVolumes != null) {
+ allVolumes.addAll(Arrays.asList(existingVolumes));
+ }
+
+ allVolumes.add(vol.getAbsolutePath());
+ conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+ allVolumes.toArray(new String[0]));
+
+ return vol;
+ }
+
+ /**
+ * Append a db volume to the existing volumes in the configuration.
+ * @return The root directory for the new volume.
+ */
+ public static File addDbVolume(OzoneConfiguration conf, Path tempFolder)
throws Exception {
+ File vol = Files.createDirectory(tempFolder.resolve(UUID.randomUUID()
+ .toString())).toFile();
+ String[] existingVolumes =
+ conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
+ List<String> allVolumes = new ArrayList<>();
+ if (existingVolumes != null) {
+ allVolumes.addAll(Arrays.asList(existingVolumes));
+ }
+
+ allVolumes.add(vol.getAbsolutePath());
+ conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+ allVolumes.toArray(new String[0]));
+
+ return vol;
+ }
+
+
+ public static void dispatchRequest(
+ ContainerDispatcher dispatcher,
+ ContainerProtos.ContainerCommandRequestProto request) {
+ dispatchRequest(dispatcher, request, ContainerProtos.Result.SUCCESS);
+ }
+
+ public static void dispatchRequest(
+ ContainerDispatcher dispatcher,
ContainerProtos.ContainerCommandRequestProto request,
+ ContainerProtos.Result expectedResult) {
+ ContainerProtos.ContainerCommandResponseProto response =
+ dispatcher.dispatch(request, null);
+ assertEquals(expectedResult, response.getResult());
+ }
+
+ public static void readChunk(
+ ContainerDispatcher dispatcher, ContainerProtos.WriteChunkRequestProto
writeChunk,
+ Pipeline pipeline) throws Exception {
+ ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+ ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
+ dispatchRequest(dispatcher, readChunkRequest);
+ }
+
+ public static ContainerProtos.WriteChunkRequestProto putBlock(
+ ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+ boolean incremental) throws Exception {
+ return putBlock(dispatcher, containerID, pipeline, incremental,
ContainerProtos.Result.SUCCESS);
+ }
+
+ public static ContainerProtos.WriteChunkRequestProto putBlock(
+ ContainerDispatcher dispatcher, long containerID, Pipeline pipeline)
throws Exception {
+ return putBlock(dispatcher, containerID, pipeline, false,
ContainerProtos.Result.SUCCESS);
+ }
+
+ public static ContainerProtos.WriteChunkRequestProto putBlock(
+ ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+ boolean incremental, ContainerProtos.Result expectedResult) throws
Exception {
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper.getWriteChunkRequest(pipeline,
+ ContainerTestHelper.getTestBlockID(containerID), 100);
+ dispatchRequest(dispatcher, writeChunkRequest);
+
+ ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+ ContainerTestHelper.getPutBlockRequest(pipeline,
+ writeChunkRequest.getWriteChunk(), incremental);
+ dispatchRequest(dispatcher, putBlockRequest, expectedResult);
+ return writeChunkRequest.getWriteChunk();
+ }
+
+ public static long addContainer(ContainerDispatcher dispatcher, Pipeline
pipeline)
+ throws Exception {
+ long containerID = RANDOM.nextInt(Integer.MAX_VALUE);
+ ContainerProtos.ContainerCommandRequestProto createContainerRequest =
+ ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
+ dispatchRequest(dispatcher, createContainerRequest);
+
+ return containerID;
+ }
+
+ public static void deleteContainer(ContainerDispatcher dispatcher, long
containerID, Pipeline pipeline)
+ throws Exception {
+ ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
+ ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
+ dispatchRequest(dispatcher, deleteContainerRequest);
+ }
+
+ public static void closeContainer(ContainerDispatcher dispatcher, long
containerID, Pipeline pipeline)
+ throws Exception {
+ closeContainer(dispatcher, containerID, pipeline,
ContainerProtos.Result.SUCCESS);
+ }
+
+ public static void closeContainer(
+ ContainerDispatcher dispatcher, long containerID, Pipeline pipeline,
+ ContainerProtos.Result expectedResult) throws Exception {
+ ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
+ ContainerTestHelper.getCloseContainer(pipeline, containerID);
+ dispatchRequest(dispatcher, closeContainerRequest, expectedResult);
+ }
+
+ public static void finalizeBlock(
+ ContainerDispatcher dispatcher, long containerID, long localID,
ContainerProtos.Result expectedResult) {
+ ContainerInfo container = mock(ContainerInfo.class);
+ when(container.getContainerID()).thenReturn(containerID);
+
+ ContainerProtos.ContainerCommandRequestProto finalizeBlockRequest =
+ ContainerTestHelper.getFinalizeBlockRequest(localID, container,
UUID.randomUUID().toString());
+
+ UpgradeTestHelper.dispatchRequest(dispatcher, finalizeBlockRequest,
expectedResult);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]