This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9d5cfd6934 HDDS-7083. Spread container-copy directories (#3648)
9d5cfd6934 is described below
commit 9d5cfd6934cc6ec34d3c78b99af90888e05f21af
Author: Symious <[email protected]>
AuthorDate: Thu Jan 12 13:08:36 2023 +0800
HDDS-7083. Spread container-copy directories (#3648)
---
.../hadoop/hdds/conf/OzoneConfiguration.java | 5 +-
.../common/src/main/resources/ozone-default.xml | 6 +-
.../container/common/helpers/ContainerUtils.java | 20 ++++
.../common/interfaces/ContainerPacker.java | 3 +-
.../common/statemachine/DatanodeStateMachine.java | 4 +-
.../container/common/volume/StorageVolume.java | 4 +
.../container/keyvalue/KeyValueContainer.java | 81 ++++++++-----
.../ozone/container/keyvalue/KeyValueHandler.java | 15 +--
.../container/keyvalue/TarContainerPacker.java | 131 +++++++++++++++------
.../helpers/KeyValueContainerLocationUtil.java | 41 +++++--
.../container/ozoneimpl/ContainerController.java | 3 +-
.../container/replication/ContainerDownloader.java | 2 +-
.../replication/DownloadAndImportReplicator.java | 114 +++++++++++++-----
.../replication/GrpcReplicationClient.java | 5 +-
.../replication/SimpleContainerDownloader.java | 33 +++---
.../common/helpers/TestContainerUtils.java | 10 ++
.../container/keyvalue/TestTarContainerPacker.java | 29 +++--
.../replication/TestReplicationSupervisor.java | 19 ++-
.../replication/TestSimpleContainerDownloader.java | 50 ++++++--
.../upgrade/TestDatanodeUpgradeToScmHA.java | 11 +-
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 2 -
.../ozoneimpl/TestOzoneContainerWithTLS.java | 5 +-
.../ozone/freon/ClosedContainerReplicator.java | 4 +-
23 files changed, 420 insertions(+), 177 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java
index f35073c472..91415c34ec 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/conf/OzoneConfiguration.java
@@ -46,6 +46,7 @@ import com.google.common.base.Preconditions;
import org.apache.ratis.server.RaftServerConfigKeys;
import static
org.apache.hadoop.hdds.ratis.RatisHelper.HDDS_DATANODE_RATIS_PREFIX_KEY;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR;
/**
* Configuration for ozone.
@@ -308,7 +309,9 @@ public class OzoneConfiguration extends Configuration
new DeprecationDelta("dfs.datanode.keytab.file",
DFSConfigKeysLegacy.DFS_DATANODE_KERBEROS_KEYTAB_FILE_KEY),
new DeprecationDelta("ozone.scm.chunk.layout",
- ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY)
+ ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY),
+ new DeprecationDelta("hdds.datanode.replication.work.dir",
+ OZONE_CONTAINER_COPY_WORKDIR)
});
}
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index d23473ec85..a1c8787775 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1579,9 +1579,9 @@
<property>
<name>hdds.datanode.replication.work.dir</name>
<tag>DATANODE</tag>
- <description>Temporary which is used during the container replication
- betweeen datanodes. Should have enough space to store multiple container
- (in compressed format), but doesn't require fast io access such as SSD.
+ <description>This configuration is deprecated. Temporary sub directory
under
+ each hdds.datanode.dir will be used during the container replication
between
+ datanodes to save the downloaded container(in compressed format).
</description>
</property>
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index e0480735e5..81a6935098 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -33,6 +33,8 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -278,4 +280,22 @@ public final class ContainerUtils {
return Long.parseLong(containerBaseDir.getName());
}
+ public static String getContainerTarGzName(long containerId) {
+ return "container-" + containerId + ".tar.gz";
+ }
+
+ public static long retrieveContainerIdFromTarGzName(String tarGzName)
+ throws IOException {
+ assert tarGzName != null;
+ Pattern pattern = Pattern.compile("container-(\\d+).tar.gz");
+ // Now create matcher object.
+ Matcher m = pattern.matcher(tarGzName);
+
+ if (m.find()) {
+ return Long.parseLong(m.group(1));
+ } else {
+ throw new IOException("Illegal container tar gz file " +
+ tarGzName);
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
index 8308c23866..a7c7f5ad20 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.interfaces;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.file.Path;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -39,7 +40,7 @@ public interface ContainerPacker<CONTAINERDATA extends
ContainerData> {
* file but returned).
*/
byte[] unpackContainerData(Container<CONTAINERDATA> container,
- InputStream inputStream)
+ InputStream inputStream, Path tmpDir, Path destContainerDir)
throws IOException;
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 99e931f833..4b8f0be1e3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -172,10 +172,10 @@ public class DatanodeStateMachine implements Closeable {
nextHB = new AtomicLong(Time.monotonicNow());
ContainerReplicator replicator =
- new DownloadAndImportReplicator(container.getContainerSet(),
+ new DownloadAndImportReplicator(conf, container.getContainerSet(),
container.getController(),
new SimpleContainerDownloader(conf, dnCertClient),
- new TarContainerPacker());
+ new TarContainerPacker(), container.getVolumeSet());
replicatorMetrics = new MeasuredReplicator(replicator);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 561708b852..a5cddc175e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -368,6 +368,10 @@ public abstract class StorageVolume
}
}
+ public String getVolumeRootDir() {
+ return volumeInfo != null ? volumeInfo.getRootDir() : null;
+ }
+
public long getCapacity() {
return volumeInfo != null ? volumeInfo.getCapacity() : 0;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 7412e766d0..bd58b7253b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.util.Collections;
@@ -56,6 +58,7 @@ import
org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -507,21 +510,22 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
@Override
public void importContainerData(InputStream input,
- ContainerPacker<KeyValueContainerData> packer) throws IOException {
+ ContainerPacker<KeyValueContainerData> packer)
+ throws IOException {
+ HddsVolume hddsVolume = containerData.getVolume();
+ String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
+ hddsVolume, hddsVolume.getClusterID());
+ long containerId = containerData.getContainerID();
+ Path destContainerDir =
+ Paths.get(KeyValueContainerLocationUtil.getBaseContainerLocation(
+ hddsVolume.getHddsRootDir().toString(), idDir, containerId));
+ Path tmpDir = DownloadAndImportReplicator.getUntarDirectory(hddsVolume);
writeLock();
try {
- if (getContainerFile().exists()) {
- String errorMessage = String.format(
- "Can't import container (cid=%d) data to a specific location"
- + " as the container descriptor (%s) has already been exist.",
- getContainerData().getContainerID(),
- getContainerFile().getAbsolutePath());
- throw new StorageContainerException(errorMessage,
- CONTAINER_ALREADY_EXISTS);
- }
//copy the values from the input stream to the final destination
// directory.
- byte[] descriptorContent = packer.unpackContainerData(this, input);
+ byte[] descriptorContent = packer.unpackContainerData(this, input,
tmpDir,
+ destContainerDir);
Preconditions.checkNotNull(descriptorContent,
"Container descriptor is missing from the container archive: "
@@ -533,31 +537,28 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
KeyValueContainerData originalContainerData =
(KeyValueContainerData) ContainerDataYaml
.readContainer(descriptorContent);
-
-
- containerData.setState(originalContainerData.getState());
- containerData
- .setContainerDBType(originalContainerData.getContainerDBType());
- containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
-
- //rewriting the yaml file with new checksum calculation.
- update(originalContainerData.getMetadata(), true);
-
- if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
- // load metadata from received dump files before we try to parse kv
- BlockUtils.loadKVContainerDataFromFiles(containerData, config);
+ importContainerData(originalContainerData);
+ } catch (Exception ex) {
+ // clean data under tmp directory
+ try {
+ Path containerUntarDir = tmpDir.resolve(String.valueOf(containerId));
+ if (containerUntarDir.toFile().exists()) {
+ FileUtils.deleteDirectory(containerUntarDir.toFile());
+ }
+ } catch (Exception deleteex) {
+ LOG.error(
+ "Can not cleanup container directory under {} for container {}",
+ tmpDir, containerId, deleteex);
}
- //fill in memory stat counter (keycount, byte usage)
- KeyValueContainerUtil.parseKVContainerData(containerData, config);
-
- } catch (Exception ex) {
+ // Throw exception for existed containers
if (ex instanceof StorageContainerException &&
((StorageContainerException) ex).getResult() ==
CONTAINER_ALREADY_EXISTS) {
throw ex;
}
- //delete all the temporary data in case of any exception.
+
+ // delete all other temporary data in case of any exception.
try {
if (containerData.getSchemaVersion() != null &&
containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
@@ -570,8 +571,7 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
} catch (Exception deleteex) {
LOG.error(
"Can not cleanup destination directories after a container import"
- + " error (cid" +
- containerData.getContainerID() + ")", deleteex);
+ + " error (cid: {}", containerId, deleteex);
}
throw ex;
} finally {
@@ -579,6 +579,25 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
}
}
+ public void importContainerData(KeyValueContainerData originalContainerData)
+ throws IOException {
+ containerData.setState(originalContainerData.getState());
+ containerData
+ .setContainerDBType(originalContainerData.getContainerDBType());
+ containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
+
+ //rewriting the yaml file with new checksum calculation.
+ update(originalContainerData.getMetadata(), true);
+
+ if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+ // load metadata from received dump files before we try to parse kv
+ BlockUtils.loadKVContainerDataFromFiles(containerData, config);
+ }
+
+ //fill in memory stat counter (keycount, byte usage)
+ KeyValueContainerUtil.parseKVContainerData(containerData, config);
+ }
+
@Override
public void exportContainerData(OutputStream destination,
ContainerPacker<KeyValueContainerData> packer) throws IOException {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index d003af86be..7a4cc5807e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -71,7 +71,6 @@ import
org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
-import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
@@ -381,13 +380,11 @@ public class KeyValueHandler extends Handler {
return getSuccessResponse(request);
}
- private void populateContainerPathFields(KeyValueContainer container)
- throws IOException {
+ private void populateContainerPathFields(KeyValueContainer container,
+ HddsVolume hddsVolume) throws IOException {
volumeSet.readLock();
+ HddsVolume containerVolume = hddsVolume;
try {
- HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
- StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
- container.getContainerData().getMaxSize());
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
containerVolume, clusterId);
container.populatePathFields(idDir, containerVolume);
@@ -1014,8 +1011,7 @@ public class KeyValueHandler extends Handler {
@Override
public Container importContainer(ContainerData originalContainerData,
final InputStream rawContainerStream,
- final TarContainerPacker packer)
- throws IOException {
+ final TarContainerPacker packer) throws IOException {
Preconditions.checkState(originalContainerData instanceof
KeyValueContainerData, "Should be KeyValueContainerData instance");
@@ -1025,7 +1021,8 @@ public class KeyValueHandler extends Handler {
KeyValueContainer container = new KeyValueContainer(containerData,
conf);
- populateContainerPathFields(container);
+ HddsVolume targetVolume = originalContainerData.getVolume();
+ populateContainerPathFields(container, targetVolume);
container.importContainerData(rawContainerStream, packer);
sendICR(container);
return container;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 4c851979ed..3d8c445a84 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -28,15 +28,19 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Objects;
+import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
+import java.util.Objects;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hdds.HddsUtils;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
@@ -44,10 +48,13 @@ import
org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import static java.util.stream.Collectors.toList;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
/**
@@ -83,48 +90,35 @@ public class TarContainerPacker
*/
@Override
public byte[] unpackContainerData(Container<KeyValueContainerData> container,
- InputStream input)
+ InputStream input, Path tmpDir, Path destContainerDir)
throws IOException {
- byte[] descriptorFileContent = null;
KeyValueContainerData containerData = container.getContainerData();
- Path dbRoot = getDbPath(containerData);
- Path chunksRoot = Paths.get(containerData.getChunksPath());
+ long containerId = containerData.getContainerID();
- try (InputStream decompressed = decompress(input);
- ArchiveInputStream archiveInput = untar(decompressed)) {
+ Path containerUntarDir = tmpDir.resolve(String.valueOf(containerId));
+ if (containerUntarDir.toFile().exists()) {
+ FileUtils.deleteDirectory(containerUntarDir.toFile());
+ }
- ArchiveEntry entry = archiveInput.getNextEntry();
- while (entry != null) {
- String name = entry.getName();
- long size = entry.getSize();
- if (name.startsWith(DB_DIR_NAME + "/")) {
- Path destinationPath = dbRoot
- .resolve(name.substring(DB_DIR_NAME.length() + 1));
- extractEntry(entry, archiveInput, size, dbRoot,
- destinationPath);
- } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
- Path destinationPath = chunksRoot
- .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
- extractEntry(entry, archiveInput, size, chunksRoot,
- destinationPath);
- } else if (CONTAINER_FILE_NAME.equals(name)) {
- //Don't do anything. Container file should be unpacked in a
- //separated step by unpackContainerDescriptor call.
- descriptorFileContent = readEntry(archiveInput, size);
- } else {
- throw new IllegalArgumentException(
- "Unknown entry in the tar file: " + "" + name);
- }
- entry = archiveInput.getNextEntry();
- }
- return descriptorFileContent;
+ Path dbRoot = getDbPath(containerUntarDir, containerData);
+ Path chunksRoot = getChunkPath(containerUntarDir, containerData);
+ byte[] descriptorFileContent = innerUnpack(input, dbRoot, chunksRoot);
- } catch (CompressorException e) {
- throw new IOException(
- "Can't uncompress the given container: " + container
- .getContainerData().getContainerID(),
- e);
+ if (!Files.exists(destContainerDir)) {
+ Files.createDirectories(destContainerDir);
+ }
+ if (FileUtils.isEmptyDirectory(destContainerDir.toFile())) {
+ Files.move(containerUntarDir, destContainerDir,
+ StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ } else {
+ String errorMessage = "Container " + containerId +
+ " unpack failed because ContainerFile " +
+ destContainerDir.toAbsolutePath() + " already exists";
+ throw new StorageContainerException(errorMessage,
+ CONTAINER_ALREADY_EXISTS);
}
+ return descriptorFileContent;
}
private void extractEntry(ArchiveEntry entry, InputStream input, long size,
@@ -223,6 +217,33 @@ public class TarContainerPacker
}
}
+ public static Path getDbPath(Path baseDir,
+ KeyValueContainerData containerData) {
+ if (baseDir.toAbsolutePath().toString().equals(
+ containerData.getContainerPath())) {
+ return getDbPath(containerData);
+ }
+ Path containerPath = Paths.get(containerData.getContainerPath());
+ Path dbPath = Paths.get(containerData.getDbFile().getPath());
+ Path relativePath = containerPath.relativize(dbPath);
+
+ if (containerData.getSchemaVersion().equals(SCHEMA_V3)) {
+ Path metadataDir =
KeyValueContainerLocationUtil.getContainerMetaDataPath(
+ baseDir.toString()).toPath();
+ return DatanodeStoreSchemaThreeImpl.getDumpDir(metadataDir.toFile())
+ .toPath();
+ } else {
+ return baseDir.resolve(relativePath);
+ }
+ }
+
+ public static Path getChunkPath(Path baseDir,
+ KeyValueContainerData containerData) {
+ Path chunkDir = KeyValueContainerLocationUtil.getChunksLocationPath(
+ baseDir.toString()).toPath();
+ return chunkDir;
+ }
+
private byte[] readEntry(InputStream input, final long size)
throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
@@ -289,4 +310,40 @@ public class TarContainerPacker
.createCompressorOutputStream(compression, output);
}
+ private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
+ throws IOException {
+ byte[] descriptorFileContent = null;
+ try (InputStream decompressed = decompress(input);
+ ArchiveInputStream archiveInput = untar(decompressed)) {
+ ArchiveEntry entry = archiveInput.getNextEntry();
+ while (entry != null) {
+ String name = entry.getName();
+ long size = entry.getSize();
+ if (name.startsWith(DB_DIR_NAME + "/")) {
+ Path destinationPath = dbRoot
+ .resolve(name.substring(DB_DIR_NAME.length() + 1));
+ extractEntry(entry, archiveInput, size, dbRoot,
+ destinationPath);
+ } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
+ Path destinationPath = chunksRoot
+ .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
+ extractEntry(entry, archiveInput, size, chunksRoot,
+ destinationPath);
+ } else if (CONTAINER_FILE_NAME.equals(name)) {
+ //Don't do anything. Container file should be unpacked in a
+ //separated step by unpackContainerDescriptor call.
+ descriptorFileContent = readEntry(archiveInput, size);
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown entry in the tar file: " + "" + name);
+ }
+ entry = archiveInput.getNextEntry();
+ }
+ return descriptorFileContent;
+
+ } catch (CompressorException e) {
+ throw new IOException("Can't uncompress to dbRoot: " + dbRoot +
+ ", chunksRoot: " + chunksRoot, e);
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
index 13a5a69811..591efe04e2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
@@ -45,15 +45,22 @@ public final class KeyValueContainerLocationUtil {
public static File getContainerMetaDataPath(String hddsVolumeDir,
String clusterId,
long containerId) {
- String containerMetaDataPath =
- getBaseContainerLocation(hddsVolumeDir, clusterId,
- containerId);
- containerMetaDataPath = containerMetaDataPath + File.separator +
+ return getContainerMetaDataPath(
+ getBaseContainerLocation(hddsVolumeDir, clusterId, containerId));
+ }
+
+ /**
+ * Returns Container Metadata Location.
+ * @param containerBaseDir Base container dir
+ * @return containerMetadata Path to container metadata location where
+ * .container file will be stored.
+ */
+ public static File getContainerMetaDataPath(String containerBaseDir) {
+ String containerMetaDataPath = containerBaseDir + File.separator +
OzoneConsts.CONTAINER_META_PATH;
return new File(containerMetaDataPath);
}
-
/**
* Returns Container Chunks Location.
* @param baseDir
@@ -63,8 +70,17 @@ public final class KeyValueContainerLocationUtil {
*/
public static File getChunksLocationPath(String baseDir, String clusterId,
long containerId) {
- String chunksPath =
- getBaseContainerLocation(baseDir, clusterId, containerId)
+ return getChunksLocationPath(
+ getBaseContainerLocation(baseDir, clusterId, containerId));
+ }
+
+ /**
+ * Returns Container Chunks Location.
+ * @param containerBaseDir
+ * @return chunksPath
+ */
+ public static File getChunksLocationPath(String containerBaseDir) {
+ String chunksPath = containerBaseDir
+ File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
return new File(chunksPath);
}
@@ -76,7 +92,7 @@ public final class KeyValueContainerLocationUtil {
* @param containerId
* @return base directory for container.
*/
- private static String getBaseContainerLocation(String hddsVolumeDir,
+ public static String getBaseContainerLocation(String hddsVolumeDir,
String clusterId,
long containerId) {
Preconditions.checkNotNull(hddsVolumeDir, "Base Directory cannot be null");
@@ -111,7 +127,12 @@ public final class KeyValueContainerLocationUtil {
return new File(containerData.getVolume().getDbParentDir(),
OzoneConsts.CONTAINER_DB_NAME);
}
- return new File(containerData.getMetadataPath(),
- containerData.getContainerID() + OzoneConsts.DN_CONTAINER_DB);
+ return getContainerDBFile(containerData.getMetadataPath(), containerData);
+ }
+
+ public static File getContainerDBFile(String baseDir,
+ KeyValueContainerData containerData) {
+ return new File(baseDir, containerData.getContainerID() +
+ OzoneConsts.DN_CONTAINER_DB);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 171303dc0b..4087483d72 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -156,8 +156,7 @@ public class ContainerController {
public Container importContainer(
final ContainerData containerData,
final InputStream rawContainerStream,
- final TarContainerPacker packer)
- throws IOException {
+ final TarContainerPacker packer) throws IOException {
return handlers.get(containerData.getContainerType())
.importContainer(containerData, rawContainerStream, packer);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
index cd4ebffd61..45c70f8e72 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -34,6 +34,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
public interface ContainerDownloader extends Closeable {
Path getContainerDataFromReplicas(long containerId,
- List<DatanodeDetails> sources);
+ List<DatanodeDetails> sources, Path downloadDir);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index c97bd27e93..c1d99c9148 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -21,13 +21,22 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.List;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
@@ -35,6 +44,8 @@ import
org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+
/**
* Default replication implementation.
* <p>
@@ -46,36 +57,59 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
public static final Logger LOG =
LoggerFactory.getLogger(DownloadAndImportReplicator.class);
- private final ContainerSet containerSet;
+ public static final String CONTAINER_COPY_DIR = "container-copy";
+ public static final String CONTAINER_COPY_TMP_DIR = "tmp";
+ private final ContainerSet containerSet;
private final ContainerController controller;
-
private final ContainerDownloader downloader;
-
private final TarContainerPacker packer;
+ private final MutableVolumeSet volumeSet;
+ private final VolumeChoosingPolicy volumeChoosingPolicy;
+ private final long containerSize;
public DownloadAndImportReplicator(
+ ConfigurationSource conf,
ContainerSet containerSet,
ContainerController controller,
ContainerDownloader downloader,
- TarContainerPacker packer) {
+ TarContainerPacker packer,
+ MutableVolumeSet volumeSet) {
this.containerSet = containerSet;
this.controller = controller;
this.downloader = downloader;
this.packer = packer;
+ this.volumeSet = volumeSet;
+ try {
+ this.volumeChoosingPolicy = conf.getClass(
+ HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
+ .class, VolumeChoosingPolicy.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.containerSize = (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
}
- public void importContainer(long containerID, Path tarFilePath)
- throws IOException {
+ public void importContainer(long containerID, Path tarFilePath,
+ HddsVolume hddsVolume) throws IOException {
+
+ HddsVolume targetVolume = hddsVolume;
+ if (targetVolume == null) {
+ targetVolume = chooseNextVolume();
+ }
+ KeyValueContainerData originalContainerData;
try {
- ContainerData originalContainerData;
- try (FileInputStream tempContainerTarStream = new FileInputStream(
+ try (FileInputStream tmpContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
byte[] containerDescriptorYaml =
- packer.unpackContainerDescriptor(tempContainerTarStream);
- originalContainerData = ContainerDataYaml.readContainer(
- containerDescriptorYaml);
+ packer.unpackContainerDescriptor(tmpContainerTarStream);
+ originalContainerData = (KeyValueContainerData) ContainerDataYaml
+ .readContainer(containerDescriptorYaml);
}
+ originalContainerData.setVolume(targetVolume);
try (FileInputStream tempContainerTarStream = new FileInputStream(
tarFilePath.toFile())) {
@@ -105,26 +139,46 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
LOG.info("Starting replication of container {} from {}", containerID,
sourceDatanodes);
- // Wait for the download. This thread pool is limiting the parallel
- // downloads, so it's ok to block here and wait for the full download.
- Path path =
- downloader.getContainerDataFromReplicas(containerID, sourceDatanodes);
- if (path == null) {
- task.setStatus(Status.FAILED);
- } else {
- try {
- long bytes = Files.size(path);
- LOG.info("Container {} is downloaded with size {}, starting to
import.",
- containerID, bytes);
- task.setTransferredBytes(bytes);
-
- importContainer(containerID, path);
- LOG.info("Container {} is replicated successfully", containerID);
- task.setStatus(Status.DONE);
- } catch (IOException e) {
- LOG.error("Container {} replication was unsuccessful.", containerID,
e);
+ try {
+ HddsVolume targetVolume = chooseNextVolume();
+ // Wait for the download. This thread pool is limiting the parallel
+ // downloads, so it's ok to block here and wait for the full download.
+ Path tarFilePath =
+ downloader.getContainerDataFromReplicas(containerID, sourceDatanodes,
+ getUntarDirectory(targetVolume));
+ if (tarFilePath == null) {
task.setStatus(Status.FAILED);
+ return;
}
+ long bytes = Files.size(tarFilePath);
+ LOG.info("Container {} is downloaded with size {}, starting to import.",
+ containerID, bytes);
+ task.setTransferredBytes(bytes);
+
+ importContainer(containerID, tarFilePath, targetVolume);
+
+ LOG.info("Container {} is replicated successfully", containerID);
+ task.setStatus(Status.DONE);
+ } catch (IOException e) {
+ LOG.error("Container {} replication was unsuccessful.", containerID, e);
+ task.setStatus(Status.FAILED);
}
}
+
+ private HddsVolume chooseNextVolume() throws IOException {
+ // Choose volume that can hold both container in tmp and dest directory
+ return volumeChoosingPolicy.chooseVolume(
+ StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+ containerSize * 2);
+ }
+
+ public static Path getUntarDirectory(HddsVolume hddsVolume)
+ throws IOException {
+ return Paths.get(hddsVolume.getVolumeRootDir())
+ .resolve(CONTAINER_COPY_TMP_DIR).resolve(CONTAINER_COPY_DIR);
+ }
+
+ private List<HddsVolume> getHddsVolumesList() {
+ return StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 580b725fc4..b8c2c6fbb4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
import org.apache.hadoop.ozone.OzoneConsts;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -107,8 +108,8 @@ public class GrpcReplicationClient implements AutoCloseable
{
CompletableFuture<Path> response = new CompletableFuture<>();
- Path destinationPath =
- getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
+ Path destinationPath = getWorkingDirectory()
+ .resolve(ContainerUtils.getContainerTarGzName(containerId));
client.download(request,
new StreamDownloader(containerId, response, destinationPath));
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index e46cd7c1b1..329fd64fa2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -32,12 +32,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator.CONTAINER_COPY_DIR;
+
/**
* Simple ContainerDownloaderImplementation to download the missing container
@@ -51,22 +52,12 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
public static final Logger LOG =
LoggerFactory.getLogger(SimpleContainerDownloader.class);
- private final Path workingDirectory;
private final SecurityConfig securityConfig;
private final CertificateClient certClient;
private final String compression;
- public SimpleContainerDownloader(ConfigurationSource conf,
- CertificateClient certClient) {
- String workDirString =
- conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR);
-
- if (workDirString == null) {
- workingDirectory = Paths.get(System.getProperty("java.io.tmpdir"))
- .resolve("container-copy");
- } else {
- workingDirectory = Paths.get(workDirString);
- }
+ public SimpleContainerDownloader(
+ ConfigurationSource conf, CertificateClient certClient) {
securityConfig = new SecurityConfig(conf);
this.certClient = certClient;
this.compression = CopyContainerCompression.getConf(conf).toString();
@@ -74,7 +65,13 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
@Override
public Path getContainerDataFromReplicas(
- long containerId, List<DatanodeDetails> sourceDatanodes) {
+ long containerId, List<DatanodeDetails> sourceDatanodes,
+ Path downloadDir) {
+
+ if (downloadDir == null) {
+ downloadDir = Paths.get(System.getProperty("java.io.tmpdir"))
+ .resolve(CONTAINER_COPY_DIR);
+ }
final List<DatanodeDetails> shuffledDatanodes =
shuffleDatanodes(sourceDatanodes);
@@ -82,7 +79,7 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
for (DatanodeDetails datanode : shuffledDatanodes) {
try {
CompletableFuture<Path> result =
- downloadContainer(containerId, datanode);
+ downloadContainer(containerId, datanode, downloadDir);
return result.get();
} catch (ExecutionException | IOException e) {
LOG.error("Error on replicating container: {} from {}/{}", containerId,
@@ -115,12 +112,14 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
@VisibleForTesting
protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode) throws IOException {
+ long containerId, DatanodeDetails datanode, Path downloadDir)
+ throws IOException {
CompletableFuture<Path> result;
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.REPLICATION).getValue(),
- workingDirectory, securityConfig, certClient, compression);
+ downloadDir, securityConfig, certClient, compression);
+
result = grpcReplicationClient.download(containerId)
.whenComplete((r, ex) -> {
try {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
index 6ae0461fc9..51b929eb2b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.nio.ByteBuffer;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -57,4 +58,13 @@ public class TestContainerUtils {
assertEquals("<redacted>", dataBuffers.getBuffers(0).toString(UTF_8));
}
+ @Test
+ public void testTarGzName() throws IOException {
+ long containerId = 100;
+ String tarGzName = "container-100.tar.gz";
+ assertEquals(tarGzName, ContainerUtils.getContainerTarGzName(containerId));
+
+ assertEquals(containerId,
+ ContainerUtils.retrieveContainerIdFromTarGzName(tarGzName));
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index 2d83471691..9e16fb88bc 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -136,15 +136,22 @@ public class TestTarContainerPacker {
}
private KeyValueContainerData createContainer(Path dir) throws IOException {
+ return createContainer(dir, true);
+ }
+
+ private KeyValueContainerData createContainer(Path dir, boolean createDir)
+ throws IOException {
long id = CONTAINER_ID.getAndIncrement();
- Path containerDir = dir.resolve("container" + id);
+ Path containerDir = dir.resolve(String.valueOf(id));
Path dbDir = containerDir.resolve("db");
- Path dataDir = containerDir.resolve("data");
+ Path dataDir = containerDir.resolve("chunks");
Path metaDir = containerDir.resolve("metadata");
- Files.createDirectories(metaDir);
- Files.createDirectories(dbDir);
- Files.createDirectories(dataDir);
+ if (createDir) {
+ Files.createDirectories(metaDir);
+ Files.createDirectories(dbDir);
+ Files.createDirectories(dataDir);
+ }
KeyValueContainerData containerData = new KeyValueContainerData(
id, layout,
@@ -211,7 +218,7 @@ public class TestTarContainerPacker {
}
KeyValueContainerData destinationContainerData =
- createContainer(DEST_CONTAINER_ROOT);
+ createContainer(DEST_CONTAINER_ROOT, false);
KeyValueContainer destinationContainer =
new KeyValueContainer(destinationContainerData, conf);
@@ -221,7 +228,10 @@ public class TestTarContainerPacker {
//unpackContainerData
try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
descriptor =
- new String(packer.unpackContainerData(destinationContainer, input),
+ new String(packer.unpackContainerData(destinationContainer, input,
+ TEMP_DIR,
+ DEST_CONTAINER_ROOT.resolve(String.valueOf(
+ destinationContainer.getContainerData().getContainerID()))),
UTF_8);
}
@@ -316,9 +326,10 @@ public class TestTarContainerPacker {
private KeyValueContainerData unpackContainerData(File containerFile)
throws IOException {
try (FileInputStream input = new FileInputStream(containerFile)) {
- KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
+ KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT, false);
KeyValueContainer container = new KeyValueContainer(data, conf);
- packer.unpackContainerData(container, input);
+ packer.unpackContainerData(container, input, TEMP_DIR,
+ DEST_CONTAINER_ROOT.resolve(String.valueOf(data.getContainerID())));
return data;
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index d9770a14e5..2ea47f5972 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.ozone.container.replication;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.time.Instant;
import java.time.ZoneId;
import java.util.List;
@@ -36,6 +38,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
@@ -245,21 +249,30 @@ public class TestReplicationSupervisor {
}
@Test
- public void testDownloadAndImportReplicatorFailure() {
+ public void testDownloadAndImportReplicatorFailure() throws IOException {
ReplicationSupervisor supervisor =
new ReplicationSupervisor(set, context, mutableReplicator,
newDirectExecutorService(), clock);
+ OzoneConfiguration conf = new OzoneConfiguration();
// Mock to fetch an exception in the importContainer method.
SimpleContainerDownloader moc =
Mockito.mock(SimpleContainerDownloader.class);
Path res = Paths.get("file:/tmp/no-such-file");
Mockito.when(
- moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList()))
+ moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(),
+ Mockito.any(Path.class)))
.thenReturn(res);
+ final String testDir = GenericTestUtils.getTempPath(
+ TestReplicationSupervisor.class.getSimpleName() +
+ "-" + UUID.randomUUID().toString());
+ MutableVolumeSet volumeSet = Mockito.mock(MutableVolumeSet.class);
+ Mockito.when(volumeSet.getVolumesList())
+ .thenReturn(Collections.singletonList(
+ new HddsVolume.Builder(testDir).conf(conf).build()));
ContainerReplicator replicatorFactory =
- new DownloadAndImportReplicator(set, null, moc, null);
+ new DownloadAndImportReplicator(conf, set, null, moc, null, volumeSet);
replicatorRef.set(replicatorFactory);
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index bc83bf9bcb..42d44a7c23 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.ozone.container.replication;
+import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -31,8 +34,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
@@ -40,6 +49,9 @@ import org.junit.jupiter.api.Timeout;
*/
public class TestSimpleContainerDownloader {
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
private static final String SUCCESS_PATH = "downloaded";
@Test
@@ -53,7 +65,8 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes);
+ downloader.getContainerDataFromReplicas(1L, datanodes,
+ tempDir.newFolder().toPath());
//THEN
Assertions.assertEquals(datanodes.get(0).getUuidString(),
@@ -72,7 +85,8 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes);
+ downloader.getContainerDataFromReplicas(1L, datanodes,
+ tempDir.newFolder().toPath());
//THEN
//first datanode is failed, second worked
@@ -91,7 +105,8 @@ public class TestSimpleContainerDownloader {
//WHEN
final Path result =
- downloader.getContainerDataFromReplicas(1L, datanodes);
+ downloader.getContainerDataFromReplicas(1L, datanodes,
+ tempDir.newFolder().toPath());
//THEN
//first datanode is failed, second worked
@@ -105,7 +120,7 @@ public class TestSimpleContainerDownloader {
@Test
@Timeout(10)
public void testRandomSelection()
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException, IOException {
//GIVEN
final List<DatanodeDetails> datanodes = createDatanodes();
@@ -115,7 +130,7 @@ public class TestSimpleContainerDownloader {
@Override
protected CompletableFuture<Path> downloadContainer(
- long containerId, DatanodeDetails datanode
+ long containerId, DatanodeDetails datanode, Path downloadPath
) {
//download is always successful.
return CompletableFuture
@@ -126,7 +141,8 @@ public class TestSimpleContainerDownloader {
//WHEN executed, THEN at least once the second datanode should be
//returned.
for (int i = 0; i < 10000; i++) {
- Path path = downloader.getContainerDataFromReplicas(1L, datanodes);
+ Path path = downloader.getContainerDataFromReplicas(1L, datanodes,
+ tempDir.newFolder().toPath());
if (path.toString().equals(datanodes.get(1).getUuidString())) {
return;
}
@@ -167,8 +183,7 @@ public class TestSimpleContainerDownloader {
@Override
protected CompletableFuture<Path> downloadContainer(
- long containerId,
- DatanodeDetails datanode
+ long containerId, DatanodeDetails datanode, Path downloadPath
) {
if (datanodes.contains(datanode)) {
@@ -197,4 +212,21 @@ public class TestSimpleContainerDownloader {
datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
return datanodes;
}
+
+ private VolumeSet getVolumeSet(DatanodeDetails datanodeDetails,
+ OzoneConfiguration conf) throws IOException {
+ String clusterId = UUID.randomUUID().toString();
+ int volumeNum = 3;
+ File[] hddsVolumeDirs = new File[volumeNum];
+ StringBuilder hddsDirs = new StringBuilder();
+ for (int i = 0; i < volumeNum; i++) {
+ hddsVolumeDirs[i] = tempDir.newFolder();
+ hddsDirs.append(hddsVolumeDirs[i]).append(",");
+ }
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, hddsDirs.toString());
+ VolumeSet hddsVolumeSet = new MutableVolumeSet(
+ datanodeDetails.getUuidString(), clusterId, conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ return hddsVolumeSet;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index ec0d9bc335..1e01d08080 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -33,6 +33,7 @@ import
org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.container.common.ScmTestMock;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import
org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
@@ -666,15 +667,17 @@ public class TestDatanodeUpgradeToScmHA {
*/
public void importContainer(long containerID, File source) throws Exception {
DownloadAndImportReplicator replicator =
- new DownloadAndImportReplicator(dsm.getContainer().getContainerSet(),
+ new DownloadAndImportReplicator(dsm.getConf(),
+ dsm.getContainer().getContainerSet(),
dsm.getContainer().getController(),
new SimpleContainerDownloader(conf, null),
- new TarContainerPacker());
+ new TarContainerPacker(), dsm.getContainer().getVolumeSet());
- File tempFile = tempFolder.newFile();
+ File tempFile = tempFolder.newFile(
+ ContainerUtils.getContainerTarGzName(containerID));
Files.copy(source.toPath(), tempFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
- replicator.importContainer(containerID, tempFile.toPath());
+ replicator.importContainer(containerID, tempFile.toPath(), null);
}
public void dispatchRequest(
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 8249126a43..b9e338870f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -856,8 +856,6 @@ public class MiniOzoneClusterImpl implements
MiniOzoneCluster {
reservedSpaceString);
dnConf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
ratisDir.toString());
- dnConf.set(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR,
- workDir.toString());
if (reconServer != null) {
OzoneStorageContainerManager reconScm =
reconServer.getReconStorageContainerManager();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index 8897e30897..19fe829efe 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -271,7 +271,7 @@ public class TestOzoneContainerWithTLS {
SimpleContainerDownloader downloader =
new SimpleContainerDownloader(conf, caClient);
Path file = downloader.getContainerDataFromReplicas(
- containerId, sourceDatanodes);
+ containerId, sourceDatanodes, null);
downloader.close();
Assert.assertNull(file);
Assert.assertTrue(logCapture.getOutput().contains(
@@ -308,7 +308,8 @@ public class TestOzoneContainerWithTLS {
for (Long cId : containerIdList) {
downloader = new SimpleContainerDownloader(conf, caClient);
try {
- file = downloader.getContainerDataFromReplicas(cId, sourceDatanodes);
+ file = downloader.getContainerDataFromReplicas(cId, sourceDatanodes,
+ null);
downloader.close();
Assert.assertNotNull(file);
} finally {
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index 0f0a95f71f..1c0ba2a80c 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -203,10 +203,10 @@ public class ClosedContainerReplicator extends
BaseFreonGenerator implements
new ContainerController(containerSet, handlers);
ContainerReplicator replicator =
- new DownloadAndImportReplicator(containerSet,
+ new DownloadAndImportReplicator(conf, containerSet,
controller,
new SimpleContainerDownloader(conf, null),
- new TarContainerPacker());
+ new TarContainerPacker(), null);
ReplicationServer.ReplicationConfig replicationConfig
= conf.getObject(ReplicationServer.ReplicationConfig.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]