This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4184499e80 HDDS-7623. Do not compress container re-replication traffic
by default (#4089)
4184499e80 is described below
commit 4184499e80ef9c7c9e613d55a64cdc8319ab2e6a
Author: Chung En Lee <[email protected]>
AuthorDate: Tue Dec 27 12:34:34 2022 +0800
HDDS-7623. Do not compress container re-replication traffic by default
(#4089)
---
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 2 +
.../common/src/main/resources/ozone-default.xml | 8 ++
hadoop-hdds/container-service/pom.xml | 4 +
.../container/keyvalue/TarContainerPacker.java | 34 +++--
.../replication/ContainerReplicationSource.java | 5 +-
.../replication/CopyContainerCompression.java | 78 ++++++++++++
.../replication/GrpcReplicationClient.java | 12 +-
.../replication/GrpcReplicationService.java | 8 +-
.../OnDemandContainerReplicationSource.java | 19 ++-
.../replication/SimpleContainerDownloader.java | 5 +-
.../container/keyvalue/TestKeyValueContainer.java | 141 +++++++++++----------
.../container/keyvalue/TestTarContainerPacker.java | 37 ++++--
.../upgrade/TestDatanodeUpgradeToScmHA.java | 2 +-
.../src/main/proto/DatanodeClientProtocol.proto | 9 ++
hadoop-ozone/dist/src/main/license/bin/LICENSE.txt | 1 +
hadoop-ozone/dist/src/main/license/jar-report.txt | 1 +
.../ozone/debug/container/ExportSubcommand.java | 4 +-
pom.xml | 6 +
18 files changed, 273 insertions(+), 103 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index bd96f0c26a..64772553e0 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -194,6 +194,8 @@ public final class HddsConfigKeys {
public static final String HDDS_X509_RENEW_GRACE_DURATION_DEFAULT = "P28D";
+ public static final String HDDS_CONTAINER_REPLICATION_COMPRESSION =
+ "hdds.container.replication.compression";
public static final String HDDS_X509_ROOTCA_CERTIFICATE_FILE =
"hdds.x509.rootca.certificate.file";
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 35fb3d7c23..d23473ec85 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2117,6 +2117,14 @@
<tag>OZONE, HDDS, SECURITY</tag>
<description>X509 signature certificate.</description>
</property>
+ <property>
+ <name>hdds.container.replication.compression</name>
+ <value>NO_COMPRESSION</value>
+ <tag>OZONE, HDDS, DATANODE</tag>
+ <description>Compression algorithm used for closed container replication.
+ Possible chooices include NO_COMPRESSION, GZIP, SNAPPY, LZ4, ZSTD
+ </description>
+ </property>
<property>
<name>hdds.x509.rootca.certificate.file</name>
<value></value>
diff --git a/hadoop-hdds/container-service/pom.xml
b/hadoop-hdds/container-service/pom.xml
index 9bfd9433bb..3f1990bfc0 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -42,6 +42,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-common</artifactId>
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index e555c10814..4c851979ed 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -28,8 +28,10 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Objects;
import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -60,6 +62,18 @@ public class TarContainerPacker
private static final String CONTAINER_FILE_NAME = "container.yaml";
+ private final String compression;
+
+ private static final String NO_COMPRESSION = "no_compression";
+
+ public TarContainerPacker() {
+ this.compression = NO_COMPRESSION;
+ }
+
+ public TarContainerPacker(String compression) {
+ this.compression = compression;
+ }
+
/**
* Given an input stream (tar file) extract the data to the specified
* directories.
@@ -77,7 +91,7 @@ public class TarContainerPacker
Path chunksRoot = Paths.get(containerData.getChunksPath());
try (InputStream decompressed = decompress(input);
- ArchiveInputStream archiveInput = untar(decompressed)) {
+ ArchiveInputStream archiveInput = untar(decompressed)) {
ArchiveEntry entry = archiveInput.getNextEntry();
while (entry != null) {
@@ -180,7 +194,7 @@ public class TarContainerPacker
public byte[] unpackContainerDescriptor(InputStream input)
throws IOException {
try (InputStream decompressed = decompress(input);
- ArchiveInputStream archiveInput = untar(decompressed)) {
+ ArchiveInputStream archiveInput = untar(decompressed)) {
ArchiveEntry entry = archiveInput.getNextEntry();
while (entry != null) {
@@ -259,16 +273,20 @@ public class TarContainerPacker
return new TarArchiveOutputStream(output);
}
- private static InputStream decompress(InputStream input)
+ @VisibleForTesting
+ InputStream decompress(InputStream input)
throws CompressorException {
- return new CompressorStreamFactory()
- .createCompressorInputStream(CompressorStreamFactory.GZIP, input);
+ return Objects.equals(compression, NO_COMPRESSION) ?
+ input : new CompressorStreamFactory()
+ .createCompressorInputStream(compression, input);
}
- private static OutputStream compress(OutputStream output)
+ @VisibleForTesting
+ OutputStream compress(OutputStream output)
throws CompressorException {
- return new CompressorStreamFactory()
- .createCompressorOutputStream(CompressorStreamFactory.GZIP, output);
+ return Objects.equals(compression, NO_COMPRESSION) ?
+ output : new CompressorStreamFactory()
+ .createCompressorOutputStream(compression, output);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
index 69582f799f..72f32b2089 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -40,10 +40,11 @@ public interface ContainerReplicationSource {
* Copy the container data to an output stream.
*
* @param containerId Container to replicate
- * @param destination The destination stream to copy all the container
data.
+ * @param destination The destination stream to copy all the container data.
+ * @param compression Compression algorithm.
* @throws IOException
*/
- void copyData(long containerId, OutputStream destination)
+ void copyData(long containerId, OutputStream destination, String compression)
throws IOException;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
new file mode 100644
index 0000000000..67f0d6fc85
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
+
+/**
+ * Defines compression algorithm for container replication.
+ */
+public enum CopyContainerCompression {
+
+ NO_COMPRESSION,
+ GZIP,
+ LZ4,
+ SNAPPY,
+ ZSTD;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CopyContainerCompression.class);
+
+ private static final CopyContainerCompression DEFAULT_COMPRESSION =
+ CopyContainerCompression.NO_COMPRESSION;
+ private static final Map<CopyContainerCompression, String>
+ COMPRESSION_MAPPING = ImmutableMap.copyOf(getMapping());
+
+ private static Map<CopyContainerCompression, String> getMapping() {
+ return new HashMap<CopyContainerCompression, String>() { {
+ put(NO_COMPRESSION, "no_compression");
+ put(GZIP, CompressorStreamFactory.GZIP);
+ put(LZ4, CompressorStreamFactory.LZ4_FRAMED);
+ put(SNAPPY, CompressorStreamFactory.SNAPPY_FRAMED);
+ put(ZSTD, CompressorStreamFactory.ZSTANDARD);
+ }};
+ }
+
+ public static Map<CopyContainerCompression, String> getCompressionMapping() {
+ return COMPRESSION_MAPPING;
+ }
+
+ public static CopyContainerCompression getConf(ConfigurationSource conf) {
+ try {
+ return conf.getEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION,
+ DEFAULT_COMPRESSION);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unsupported compression codec. Skip compression.");
+ return DEFAULT_COMPRESSION;
+ }
+ }
+
+ public static CopyContainerCompression getDefaultCompression() {
+ return NO_COMPRESSION;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index e1348614e7..580b725fc4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
@@ -60,8 +61,12 @@ public class GrpcReplicationClient implements AutoCloseable {
private final Path workingDirectory;
- public GrpcReplicationClient(String host, int port, Path workingDir,
- SecurityConfig secConfig, CertificateClient certClient)
+ private final ContainerProtos.CopyContainerCompressProto compression;
+
+ public GrpcReplicationClient(
+ String host, int port, Path workingDir,
+ SecurityConfig secConfig, CertificateClient certClient,
+ String compression)
throws IOException {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port)
@@ -87,6 +92,8 @@ public class GrpcReplicationClient implements AutoCloseable {
channel = channelBuilder.build();
client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
workingDirectory = workingDir;
+ this.compression =
+ ContainerProtos.CopyContainerCompressProto.valueOf(compression);
}
public CompletableFuture<Path> download(long containerId) {
@@ -95,6 +102,7 @@ public class GrpcReplicationClient implements AutoCloseable {
.setContainerID(containerId)
.setLen(-1)
.setReadOffset(0)
+ .setCompression(compression)
.build();
CompletableFuture<Path> response = new CompletableFuture<>();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 60897a5db8..7246ae99f0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -49,11 +49,15 @@ public class GrpcReplicationService extends
public void download(CopyContainerRequestProto request,
StreamObserver<CopyContainerResponseProto> responseObserver) {
long containerID = request.getContainerID();
- LOG.info("Streaming container data ({}) to other datanode", containerID);
+ String compression = request.hasCompression() ?
+ request.getCompression().toString() : CopyContainerCompression
+ .getDefaultCompression().toString();
+ LOG.info("Streaming container data ({}) to other datanode " +
+ "with compression {}", containerID, compression);
try {
GrpcOutputStream outputStream =
new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE);
- source.copyData(containerID, outputStream);
+ source.copyData(containerID, outputStream, compression);
} catch (IOException e) {
LOG.error("Error streaming container {}", containerID, e);
responseObserver.onError(e);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index 58485ef12f..d17e29b097 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -37,11 +39,16 @@ public class OnDemandContainerReplicationSource
private final ContainerController controller;
- private final TarContainerPacker packer = new TarContainerPacker();
+ private Map<String, TarContainerPacker> packer = new HashMap<>();
public OnDemandContainerReplicationSource(
ContainerController controller) {
this.controller = controller;
+ for (Map.Entry<CopyContainerCompression, String> entry :
+ CopyContainerCompression.getCompressionMapping().entrySet()) {
+ packer.put(
+ entry.getKey().toString(), new TarContainerPacker(entry.getValue()));
+ }
}
@Override
@@ -50,7 +57,8 @@ public class OnDemandContainerReplicationSource
}
@Override
- public void copyData(long containerId, OutputStream destination)
+ public void copyData(long containerId, OutputStream destination,
+ String compression)
throws IOException {
Container container = controller.getContainer(containerId);
@@ -60,8 +68,13 @@ public class OnDemandContainerReplicationSource
" is not found.", CONTAINER_NOT_FOUND);
}
+ if (!packer.containsKey(compression)) {
+ throw new IOException("Can't compress the container. Compression " +
+ compression + " is not found.");
+ }
controller.exportContainer(
- container.getContainerType(), containerId, destination, packer);
+ container.getContainerType(), containerId, destination,
+ packer.get(compression));
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 18fe058728..e46cd7c1b1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Simple ContainerDownloaderImplementation to download the missing container
* from the first available datanode.
@@ -53,6 +54,7 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
private final Path workingDirectory;
private final SecurityConfig securityConfig;
private final CertificateClient certClient;
+ private final String compression;
public SimpleContainerDownloader(ConfigurationSource conf,
CertificateClient certClient) {
@@ -67,6 +69,7 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
}
securityConfig = new SecurityConfig(conf);
this.certClient = certClient;
+ this.compression = CopyContainerCompression.getConf(conf).toString();
}
@Override
@@ -117,7 +120,7 @@ public class SimpleContainerDownloader implements
ContainerDownloader {
GrpcReplicationClient grpcReplicationClient =
new GrpcReplicationClient(datanode.getIpAddress(),
datanode.getPort(Name.REPLICATION).getValue(),
- workingDirectory, securityConfig, certClient);
+ workingDirectory, securityConfig, certClient, compression);
result = grpcReplicationClient.download(containerId)
.whenComplete((r, ex) -> {
try {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index b23600ffd8..0ab457a52b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -46,6 +46,7 @@ import
org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
@@ -220,83 +221,85 @@ public class TestKeyValueContainer {
//destination path
File folderToExport = folder.newFile("exported.tar.gz");
+ for (Map.Entry<CopyContainerCompression, String> entry :
+ CopyContainerCompression.getCompressionMapping().entrySet()) {
+ TarContainerPacker packer = new TarContainerPacker(entry.getValue());
+
+ //export the container
+ try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
+ keyValueContainer
+ .exportContainerData(fos, packer);
+ }
- TarContainerPacker packer = new TarContainerPacker();
+ //delete the original one
+ keyValueContainer.delete();
- //export the container
- try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
- keyValueContainer
- .exportContainerData(fos, packer);
- }
+ //create a new one
+ KeyValueContainerData containerData =
+ new KeyValueContainerData(containerId,
+ keyValueContainerData.getLayoutVersion(),
+ keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+ datanodeId.toString());
+ containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+ KeyValueContainer container = new KeyValueContainer(containerData, CONF);
- //delete the original one
- keyValueContainer.delete();
+ HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+ StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
- //create a new one
- KeyValueContainerData containerData =
- new KeyValueContainerData(containerId,
- keyValueContainerData.getLayoutVersion(),
- keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
- datanodeId.toString());
- containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
- KeyValueContainer container = new KeyValueContainer(containerData, CONF);
-
- HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
- StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
-
- container.populatePathFields(scmId, containerVolume);
- try (FileInputStream fis = new FileInputStream(folderToExport)) {
- container.importContainerData(fis, packer);
- }
-
- assertEquals("value1", containerData.getMetadata().get("key1"));
- assertEquals(keyValueContainerData.getContainerDBType(),
- containerData.getContainerDBType());
- assertEquals(keyValueContainerData.getState(),
- containerData.getState());
- assertEquals(numberOfKeysToWrite,
- containerData.getBlockCount());
- assertEquals(keyValueContainerData.getLayoutVersion(),
- containerData.getLayoutVersion());
- assertEquals(keyValueContainerData.getMaxSize(),
- containerData.getMaxSize());
- assertEquals(keyValueContainerData.getBytesUsed(),
- containerData.getBytesUsed());
-
- //Can't overwrite existing container
- try {
+ container.populatePathFields(scmId, containerVolume);
try (FileInputStream fis = new FileInputStream(folderToExport)) {
container.importContainerData(fis, packer);
}
- fail("Container is imported twice. Previous files are overwritten");
- } catch (IOException ex) {
- //all good
- assertTrue(container.getContainerFile().exists());
- }
- //Import failure should cleanup the container directory
- containerData =
- new KeyValueContainerData(containerId + 1,
- keyValueContainerData.getLayoutVersion(),
- keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
- datanodeId.toString());
- containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
- container = new KeyValueContainer(containerData, CONF);
-
- containerVolume = volumeChoosingPolicy.chooseVolume(
- StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
- container.populatePathFields(scmId, containerVolume);
- try {
- FileInputStream fis = new FileInputStream(folderToExport);
- fis.close();
- container.importContainerData(fis, packer);
- fail("Container import should fail");
- } catch (Exception ex) {
- assertTrue(ex instanceof IOException);
- } finally {
- File directory =
- new File(container.getContainerData().getContainerPath());
- assertFalse(directory.exists());
+ assertEquals("value1", containerData.getMetadata().get("key1"));
+ assertEquals(keyValueContainerData.getContainerDBType(),
+ containerData.getContainerDBType());
+ assertEquals(keyValueContainerData.getState(),
+ containerData.getState());
+ assertEquals(numberOfKeysToWrite,
+ containerData.getBlockCount());
+ assertEquals(keyValueContainerData.getLayoutVersion(),
+ containerData.getLayoutVersion());
+ assertEquals(keyValueContainerData.getMaxSize(),
+ containerData.getMaxSize());
+ assertEquals(keyValueContainerData.getBytesUsed(),
+ containerData.getBytesUsed());
+
+ //Can't overwrite existing container
+ try {
+ try (FileInputStream fis = new FileInputStream(folderToExport)) {
+ container.importContainerData(fis, packer);
+ }
+ fail("Container is imported twice. Previous files are overwritten");
+ } catch (IOException ex) {
+ //all good
+ assertTrue(container.getContainerFile().exists());
+ }
+
+ //Import failure should cleanup the container directory
+ containerData =
+ new KeyValueContainerData(containerId + 1,
+ keyValueContainerData.getLayoutVersion(),
+ keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+ datanodeId.toString());
+ containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+ container = new KeyValueContainer(containerData, CONF);
+
+ containerVolume = volumeChoosingPolicy.chooseVolume(
+ StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
+ container.populatePathFields(scmId, containerVolume);
+ try {
+ FileInputStream fis = new FileInputStream(folderToExport);
+ fis.close();
+ container.importContainerData(fis, packer);
+ fail("Container import should fail");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof IOException);
+ } finally {
+ File directory =
+ new File(container.getContainerData().getContainerPath());
+ assertFalse(directory.exists());
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index c15841b001..2d83471691 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -22,10 +22,13 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,18 +37,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorInputStream;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
import org.apache.ozone.test.LambdaTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -55,7 +55,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
/**
* Test the tar/untar for a given container.
@@ -73,8 +72,7 @@ public class TestTarContainerPacker {
private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
- private final ContainerPacker<KeyValueContainerData> packer
- = new TarContainerPacker();
+ private TarContainerPacker packer;
private static final Path SOURCE_CONTAINER_ROOT =
Paths.get("target/test/data/packer-source-dir");
@@ -91,16 +89,29 @@ public class TestTarContainerPacker {
private final String schemaVersion;
private OzoneConfiguration conf;
- public TestTarContainerPacker(ContainerTestVersionInfo versionInfo) {
+ public TestTarContainerPacker(
+ ContainerTestVersionInfo versionInfo, String compression) {
this.layout = versionInfo.getLayout();
this.schemaVersion = versionInfo.getSchemaVersion();
this.conf = new OzoneConfiguration();
ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+ packer = new TarContainerPacker(compression);
+
}
@Parameterized.Parameters
public static Iterable<Object[]> parameters() {
- return ContainerTestVersionInfo.versionParameters();
+ List<ContainerTestVersionInfo> layoutList =
+ ContainerTestVersionInfo.getLayoutList();
+ List<Object[]> parameterList = new ArrayList<>();
+ for (ContainerTestVersionInfo containerTestVersionInfo : layoutList) {
+ for (Map.Entry<CopyContainerCompression, String> entry :
+ CopyContainerCompression.getCompressionMapping().entrySet()) {
+ parameterList.add(
+ new Object[]{containerTestVersionInfo, entry.getValue()});
+ }
+ }
+ return parameterList;
}
@BeforeClass
@@ -174,8 +185,7 @@ public class TestTarContainerPacker {
//THEN: check the result
TarArchiveInputStream tarStream = null;
try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
- CompressorInputStream uncompressed = new CompressorStreamFactory()
- .createCompressorInputStream(GZIP, input);
+ InputStream uncompressed = packer.decompress(input);
tarStream = new TarArchiveInputStream(uncompressed);
TarArchiveEntry entry;
@@ -353,9 +363,8 @@ public class TestTarContainerPacker {
throws Exception {
File targetFile = TEMP_DIR.resolve("container.tar.gz").toFile();
try (FileOutputStream output = new FileOutputStream(targetFile);
- CompressorOutputStream gzipped = new CompressorStreamFactory()
- .createCompressorOutputStream(GZIP, output);
- ArchiveOutputStream archive = new TarArchiveOutputStream(gzipped)) {
+ OutputStream compressed = packer.compress(output);
+ ArchiveOutputStream archive = new TarArchiveOutputStream(compressed))
{
TarContainerPacker.includeFile(file, entryName, archive);
}
return targetFile;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 94a83ccc49..ec0d9bc335 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -655,7 +655,7 @@ public class TestDatanodeUpgradeToScmHA {
File destination = tempFolder.newFile();
try (FileOutputStream fos = new FileOutputStream(destination)) {
- replicationSource.copyData(containerId, fos);
+ replicationSource.copyData(containerId, fos, "NO_COMPRESSION");
}
return destination;
}
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index daa47b7a17..1b7fcad140 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -477,11 +477,20 @@ message GetSmallFileResponseProto {
required ReadChunkResponseProto data = 1;
}
+enum CopyContainerCompressProto {
+ NO_COMPRESSION = 1;
+ GZIP = 2;
+ LZ4 = 3;
+ SNAPPY = 4;
+ ZSTD = 5;
+}
+
message CopyContainerRequestProto {
required int64 containerID = 1;
required uint64 readOffset = 2;
optional uint64 len = 3;
optional uint32 version = 4;
+ optional CopyContainerCompressProto compression = 5;
}
message CopyContainerResponseProto {
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index b5ecdd1d8f..828dc7e27a 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -479,6 +479,7 @@ BSD 3-Clause
BSD 2-Clause
=====================
+ com.github.luben:zstd-jni
dnsjava:dnsjava
org.codehaus.woodstox:stax2-api
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt
b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 0e900808c1..c2561d8899 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -265,3 +265,4 @@ share/ozone/lib/token-provider.jar
share/ozone/lib/txw2.jar
share/ozone/lib/weld-servlet.Final.jar
share/ozone/lib/woodstox-core.jar
+share/ozone/lib/zstd-jni.jar
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
index 0a00959f7e..e2f0a25569 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.util.concurrent.Callable;
+import static
org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
/**
@@ -61,6 +62,7 @@ public class ExportSubcommand implements Callable<Void> {
description = "Count of containers to export")
private long containerCount = 1;
+
@Override
public Void call() throws Exception {
parent.loadContainersFromVolumes();
@@ -74,7 +76,7 @@ public class ExportSubcommand implements Callable<Void> {
new File(destination, "container-" + containerId + ".tar.gz");
try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
try {
- replicationSource.copyData(containerId, fos);
+ replicationSource.copyData(containerId, fos, GZIP);
} catch (StorageContainerException e) {
if (e.getResult() == CONTAINER_NOT_FOUND) {
continue;
diff --git a/pom.xml b/pom.xml
index fc38302dfc..beacc6c2fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xs
<commons-collections.version>3.2.2</commons-collections.version>
<commons-compress.version>1.21</commons-compress.version>
<commons-configuration2.version>2.1.1</commons-configuration2.version>
+ <zstd-jni.version>1.5.2-5</zstd-jni.version>
<commons-daemon.version>1.0.13</commons-daemon.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-lang3.version>3.7</commons-lang3.version>
@@ -762,6 +763,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xs
<artifactId>commons-validator</artifactId>
<version>${commons-validator.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${zstd-jni.version}</version>
+ </dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]