This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4184499e80 HDDS-7623. Do not compress container re-replication traffic 
by default (#4089)
4184499e80 is described below

commit 4184499e80ef9c7c9e613d55a64cdc8319ab2e6a
Author: Chung En Lee <[email protected]>
AuthorDate: Tue Dec 27 12:34:34 2022 +0800

    HDDS-7623. Do not compress container re-replication traffic by default 
(#4089)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   2 +
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 hadoop-hdds/container-service/pom.xml              |   4 +
 .../container/keyvalue/TarContainerPacker.java     |  34 +++--
 .../replication/ContainerReplicationSource.java    |   5 +-
 .../replication/CopyContainerCompression.java      |  78 ++++++++++++
 .../replication/GrpcReplicationClient.java         |  12 +-
 .../replication/GrpcReplicationService.java        |   8 +-
 .../OnDemandContainerReplicationSource.java        |  19 ++-
 .../replication/SimpleContainerDownloader.java     |   5 +-
 .../container/keyvalue/TestKeyValueContainer.java  | 141 +++++++++++----------
 .../container/keyvalue/TestTarContainerPacker.java |  37 ++++--
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |   2 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |   9 ++
 hadoop-ozone/dist/src/main/license/bin/LICENSE.txt |   1 +
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   1 +
 .../ozone/debug/container/ExportSubcommand.java    |   4 +-
 pom.xml                                            |   6 +
 18 files changed, 273 insertions(+), 103 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index bd96f0c26a..64772553e0 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -194,6 +194,8 @@ public final class HddsConfigKeys {
 
   public static final String HDDS_X509_RENEW_GRACE_DURATION_DEFAULT = "P28D";
 
+  public static final String HDDS_CONTAINER_REPLICATION_COMPRESSION =
+      "hdds.container.replication.compression";
   public static final String HDDS_X509_ROOTCA_CERTIFICATE_FILE =
       "hdds.x509.rootca.certificate.file";
 
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 35fb3d7c23..d23473ec85 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2117,6 +2117,14 @@
     <tag>OZONE, HDDS, SECURITY</tag>
     <description>X509 signature certificate.</description>
   </property>
+  <property>
+    <name>hdds.container.replication.compression</name>
+    <value>NO_COMPRESSION</value>
+    <tag>OZONE, HDDS, DATANODE</tag>
+    <description>Compression algorithm used for closed container replication.
+      Possible chooices include NO_COMPRESSION, GZIP, SNAPPY, LZ4, ZSTD
+    </description>
+  </property>
   <property>
     <name>hdds.x509.rootca.certificate.file</name>
     <value></value>
diff --git a/hadoop-hdds/container-service/pom.xml 
b/hadoop-hdds/container-service/pom.xml
index 9bfd9433bb..3f1990bfc0 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -42,6 +42,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.ozone</groupId>
       <artifactId>hdds-common</artifactId>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index e555c10814..4c851979ed 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -28,8 +28,10 @@ import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Objects;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -60,6 +62,18 @@ public class TarContainerPacker
 
   private static final String CONTAINER_FILE_NAME = "container.yaml";
 
+  private final String compression;
+
+  private static final String NO_COMPRESSION = "no_compression";
+
+  public TarContainerPacker() {
+    this.compression = NO_COMPRESSION;
+  }
+
+  public TarContainerPacker(String compression) {
+    this.compression = compression;
+  }
+
   /**
    * Given an input stream (tar file) extract the data to the specified
    * directories.
@@ -77,7 +91,7 @@ public class TarContainerPacker
     Path chunksRoot = Paths.get(containerData.getChunksPath());
 
     try (InputStream decompressed = decompress(input);
-         ArchiveInputStream archiveInput = untar(decompressed)) {
+        ArchiveInputStream archiveInput = untar(decompressed)) {
 
       ArchiveEntry entry = archiveInput.getNextEntry();
       while (entry != null) {
@@ -180,7 +194,7 @@ public class TarContainerPacker
   public byte[] unpackContainerDescriptor(InputStream input)
       throws IOException {
     try (InputStream decompressed = decompress(input);
-         ArchiveInputStream archiveInput = untar(decompressed)) {
+        ArchiveInputStream archiveInput = untar(decompressed)) {
 
       ArchiveEntry entry = archiveInput.getNextEntry();
       while (entry != null) {
@@ -259,16 +273,20 @@ public class TarContainerPacker
     return new TarArchiveOutputStream(output);
   }
 
-  private static InputStream decompress(InputStream input)
+  @VisibleForTesting
+  InputStream decompress(InputStream input)
       throws CompressorException {
-    return new CompressorStreamFactory()
-        .createCompressorInputStream(CompressorStreamFactory.GZIP, input);
+    return Objects.equals(compression, NO_COMPRESSION) ?
+        input : new CompressorStreamFactory()
+        .createCompressorInputStream(compression, input);
   }
 
-  private static OutputStream compress(OutputStream output)
+  @VisibleForTesting
+  OutputStream compress(OutputStream output)
       throws CompressorException {
-    return new CompressorStreamFactory()
-        .createCompressorOutputStream(CompressorStreamFactory.GZIP, output);
+    return Objects.equals(compression, NO_COMPRESSION) ?
+        output : new CompressorStreamFactory()
+        .createCompressorOutputStream(compression, output);
   }
 
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
index 69582f799f..72f32b2089 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -40,10 +40,11 @@ public interface ContainerReplicationSource {
    * Copy the container data to an output stream.
    *
    * @param containerId Container to replicate
-   * @param destination   The destination stream to copy all the container 
data.
+   * @param destination The destination stream to copy all the container data.
+   * @param compression Compression algorithm.
    * @throws IOException
    */
-  void copyData(long containerId, OutputStream destination)
+  void copyData(long containerId, OutputStream destination, String compression)
       throws IOException;
 
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
new file mode 100644
index 0000000000..67f0d6fc85
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerCompression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPLICATION_COMPRESSION;
+
+/**
+ * Defines compression algorithm for container replication.
+ */
+public enum CopyContainerCompression {
+
+  NO_COMPRESSION,
+  GZIP,
+  LZ4,
+  SNAPPY,
+  ZSTD;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CopyContainerCompression.class);
+
+  private static final CopyContainerCompression DEFAULT_COMPRESSION =
+      CopyContainerCompression.NO_COMPRESSION;
+  private static final Map<CopyContainerCompression, String>
+      COMPRESSION_MAPPING = ImmutableMap.copyOf(getMapping());
+
+  private static Map<CopyContainerCompression, String> getMapping() {
+    return new HashMap<CopyContainerCompression, String>() { {
+        put(NO_COMPRESSION, "no_compression");
+        put(GZIP, CompressorStreamFactory.GZIP);
+        put(LZ4, CompressorStreamFactory.LZ4_FRAMED);
+        put(SNAPPY, CompressorStreamFactory.SNAPPY_FRAMED);
+        put(ZSTD, CompressorStreamFactory.ZSTANDARD);
+      }};
+  }
+
+  public static Map<CopyContainerCompression, String> getCompressionMapping() {
+    return COMPRESSION_MAPPING;
+  }
+
+  public static CopyContainerCompression getConf(ConfigurationSource conf) {
+    try {
+      return conf.getEnum(HDDS_CONTAINER_REPLICATION_COMPRESSION,
+          DEFAULT_COMPRESSION);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Unsupported compression codec. Skip compression.");
+      return DEFAULT_COMPRESSION;
+    }
+  }
+
+  public static CopyContainerCompression getDefaultCompression() {
+    return NO_COMPRESSION;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index e1348614e7..580b725fc4 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
@@ -60,8 +61,12 @@ public class GrpcReplicationClient implements AutoCloseable {
 
   private final Path workingDirectory;
 
-  public GrpcReplicationClient(String host, int port, Path workingDir,
-      SecurityConfig secConfig, CertificateClient certClient)
+  private final ContainerProtos.CopyContainerCompressProto compression;
+
+  public GrpcReplicationClient(
+      String host, int port, Path workingDir,
+      SecurityConfig secConfig, CertificateClient certClient,
+      String compression)
       throws IOException {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forAddress(host, port)
@@ -87,6 +92,8 @@ public class GrpcReplicationClient implements AutoCloseable {
     channel = channelBuilder.build();
     client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
     workingDirectory = workingDir;
+    this.compression =
+        ContainerProtos.CopyContainerCompressProto.valueOf(compression);
   }
 
   public CompletableFuture<Path> download(long containerId) {
@@ -95,6 +102,7 @@ public class GrpcReplicationClient implements AutoCloseable {
             .setContainerID(containerId)
             .setLen(-1)
             .setReadOffset(0)
+            .setCompression(compression)
             .build();
 
     CompletableFuture<Path> response = new CompletableFuture<>();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index 60897a5db8..7246ae99f0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -49,11 +49,15 @@ public class GrpcReplicationService extends
   public void download(CopyContainerRequestProto request,
       StreamObserver<CopyContainerResponseProto> responseObserver) {
     long containerID = request.getContainerID();
-    LOG.info("Streaming container data ({}) to other datanode", containerID);
+    String compression = request.hasCompression() ?
+        request.getCompression().toString() : CopyContainerCompression
+        .getDefaultCompression().toString();
+    LOG.info("Streaming container data ({}) to other datanode " +
+        "with compression {}", containerID, compression);
     try {
       GrpcOutputStream outputStream =
           new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE);
-      source.copyData(containerID, outputStream);
+      source.copyData(containerID, outputStream, compression);
     } catch (IOException e) {
       LOG.error("Error streaming container {}", containerID, e);
       responseObserver.onError(e);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index 58485ef12f..d17e29b097 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
 
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -37,11 +39,16 @@ public class OnDemandContainerReplicationSource
 
   private final ContainerController controller;
 
-  private final TarContainerPacker packer = new TarContainerPacker();
+  private Map<String, TarContainerPacker> packer = new HashMap<>();
 
   public OnDemandContainerReplicationSource(
       ContainerController controller) {
     this.controller = controller;
+    for (Map.Entry<CopyContainerCompression, String> entry :
+        CopyContainerCompression.getCompressionMapping().entrySet()) {
+      packer.put(
+          entry.getKey().toString(), new TarContainerPacker(entry.getValue()));
+    }
   }
 
   @Override
@@ -50,7 +57,8 @@ public class OnDemandContainerReplicationSource
   }
 
   @Override
-  public void copyData(long containerId, OutputStream destination)
+  public void copyData(long containerId, OutputStream destination,
+                       String compression)
       throws IOException {
 
     Container container = controller.getContainer(containerId);
@@ -60,8 +68,13 @@ public class OnDemandContainerReplicationSource
           " is not found.", CONTAINER_NOT_FOUND);
     }
 
+    if (!packer.containsKey(compression)) {
+      throw new IOException("Can't compress the container. Compression " +
+          compression + " is not found.");
+    }
     controller.exportContainer(
-        container.getContainerType(), containerId, destination, packer);
+        container.getContainerType(), containerId, destination,
+        packer.get(compression));
 
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 18fe058728..e46cd7c1b1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Simple ContainerDownloaderImplementation to download the missing container
  * from the first available datanode.
@@ -53,6 +54,7 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
   private final Path workingDirectory;
   private final SecurityConfig securityConfig;
   private final CertificateClient certClient;
+  private final String compression;
 
   public SimpleContainerDownloader(ConfigurationSource conf,
       CertificateClient certClient) {
@@ -67,6 +69,7 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
     }
     securityConfig = new SecurityConfig(conf);
     this.certClient = certClient;
+    this.compression = CopyContainerCompression.getConf(conf).toString();
   }
 
   @Override
@@ -117,7 +120,7 @@ public class SimpleContainerDownloader implements 
ContainerDownloader {
     GrpcReplicationClient grpcReplicationClient =
         new GrpcReplicationClient(datanode.getIpAddress(),
             datanode.getPort(Name.REPLICATION).getValue(),
-            workingDirectory, securityConfig, certClient);
+            workingDirectory, securityConfig, certClient, compression);
     result = grpcReplicationClient.download(containerId)
         .whenComplete((r, ex) -> {
           try {
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index b23600ffd8..0ab457a52b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
 import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 
@@ -220,83 +221,85 @@ public class TestKeyValueContainer {
 
     //destination path
     File folderToExport = folder.newFile("exported.tar.gz");
+    for (Map.Entry<CopyContainerCompression, String> entry :
+        CopyContainerCompression.getCompressionMapping().entrySet()) {
+      TarContainerPacker packer = new TarContainerPacker(entry.getValue());
+
+      //export the container
+      try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
+        keyValueContainer
+            .exportContainerData(fos, packer);
+      }
 
-    TarContainerPacker packer = new TarContainerPacker();
+      //delete the original one
+      keyValueContainer.delete();
 
-    //export the container
-    try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
-      keyValueContainer
-          .exportContainerData(fos, packer);
-    }
+      //create a new one
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerId,
+              keyValueContainerData.getLayoutVersion(),
+              keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+              datanodeId.toString());
+      containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+      KeyValueContainer container = new KeyValueContainer(containerData, CONF);
 
-    //delete the original one
-    keyValueContainer.delete();
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
 
-    //create a new one
-    KeyValueContainerData containerData =
-        new KeyValueContainerData(containerId,
-            keyValueContainerData.getLayoutVersion(),
-            keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
-            datanodeId.toString());
-    containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
-    KeyValueContainer container = new KeyValueContainer(containerData, CONF);
-
-    HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(
-        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
-
-    container.populatePathFields(scmId, containerVolume);
-    try (FileInputStream fis = new FileInputStream(folderToExport)) {
-      container.importContainerData(fis, packer);
-    }
-
-    assertEquals("value1", containerData.getMetadata().get("key1"));
-    assertEquals(keyValueContainerData.getContainerDBType(),
-        containerData.getContainerDBType());
-    assertEquals(keyValueContainerData.getState(),
-        containerData.getState());
-    assertEquals(numberOfKeysToWrite,
-        containerData.getBlockCount());
-    assertEquals(keyValueContainerData.getLayoutVersion(),
-        containerData.getLayoutVersion());
-    assertEquals(keyValueContainerData.getMaxSize(),
-        containerData.getMaxSize());
-    assertEquals(keyValueContainerData.getBytesUsed(),
-        containerData.getBytesUsed());
-
-    //Can't overwrite existing container
-    try {
+      container.populatePathFields(scmId, containerVolume);
       try (FileInputStream fis = new FileInputStream(folderToExport)) {
         container.importContainerData(fis, packer);
       }
-      fail("Container is imported twice. Previous files are overwritten");
-    } catch (IOException ex) {
-      //all good
-      assertTrue(container.getContainerFile().exists());
-    }
 
-    //Import failure should cleanup the container directory
-    containerData =
-        new KeyValueContainerData(containerId + 1,
-            keyValueContainerData.getLayoutVersion(),
-            keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
-            datanodeId.toString());
-    containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
-    container = new KeyValueContainer(containerData, CONF);
-
-    containerVolume = volumeChoosingPolicy.chooseVolume(
-        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
-    container.populatePathFields(scmId, containerVolume);
-    try {
-      FileInputStream fis = new FileInputStream(folderToExport);
-      fis.close();
-      container.importContainerData(fis, packer);
-      fail("Container import should fail");
-    } catch (Exception ex) {
-      assertTrue(ex instanceof IOException);
-    } finally {
-      File directory =
-          new File(container.getContainerData().getContainerPath());
-      assertFalse(directory.exists());
+      assertEquals("value1", containerData.getMetadata().get("key1"));
+      assertEquals(keyValueContainerData.getContainerDBType(),
+          containerData.getContainerDBType());
+      assertEquals(keyValueContainerData.getState(),
+          containerData.getState());
+      assertEquals(numberOfKeysToWrite,
+          containerData.getBlockCount());
+      assertEquals(keyValueContainerData.getLayoutVersion(),
+          containerData.getLayoutVersion());
+      assertEquals(keyValueContainerData.getMaxSize(),
+          containerData.getMaxSize());
+      assertEquals(keyValueContainerData.getBytesUsed(),
+          containerData.getBytesUsed());
+
+      //Can't overwrite existing container
+      try {
+        try (FileInputStream fis = new FileInputStream(folderToExport)) {
+          container.importContainerData(fis, packer);
+        }
+        fail("Container is imported twice. Previous files are overwritten");
+      } catch (IOException ex) {
+        //all good
+        assertTrue(container.getContainerFile().exists());
+      }
+
+      //Import failure should cleanup the container directory
+      containerData =
+          new KeyValueContainerData(containerId + 1,
+              keyValueContainerData.getLayoutVersion(),
+              keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
+              datanodeId.toString());
+      containerData.setSchemaVersion(keyValueContainerData.getSchemaVersion());
+      container = new KeyValueContainer(containerData, CONF);
+
+      containerVolume = volumeChoosingPolicy.chooseVolume(
+          StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), 1);
+      container.populatePathFields(scmId, containerVolume);
+      try {
+        FileInputStream fis = new FileInputStream(folderToExport);
+        fis.close();
+        container.importContainerData(fis, packer);
+        fail("Container import should fail");
+      } catch (Exception ex) {
+        assertTrue(ex instanceof IOException);
+      } finally {
+        File directory =
+            new File(container.getContainerData().getContainerPath());
+        assertFalse(directory.exists());
+      }
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index c15841b001..2d83471691 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -22,10 +22,13 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,18 +37,15 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorInputStream;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.ozone.container.replication.CopyContainerCompression;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -55,7 +55,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
 
 /**
  * Test the tar/untar for a given container.
@@ -73,8 +72,7 @@ public class TestTarContainerPacker {
 
   private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
 
-  private final ContainerPacker<KeyValueContainerData> packer
-      = new TarContainerPacker();
+  private TarContainerPacker packer;
 
   private static final Path SOURCE_CONTAINER_ROOT =
       Paths.get("target/test/data/packer-source-dir");
@@ -91,16 +89,29 @@ public class TestTarContainerPacker {
   private final String schemaVersion;
   private OzoneConfiguration conf;
 
-  public TestTarContainerPacker(ContainerTestVersionInfo versionInfo) {
+  public TestTarContainerPacker(
+      ContainerTestVersionInfo versionInfo, String compression) {
     this.layout = versionInfo.getLayout();
     this.schemaVersion = versionInfo.getSchemaVersion();
     this.conf = new OzoneConfiguration();
     ContainerTestVersionInfo.setTestSchemaVersion(schemaVersion, conf);
+    packer = new TarContainerPacker(compression);
+
   }
 
   @Parameterized.Parameters
   public static Iterable<Object[]> parameters() {
-    return ContainerTestVersionInfo.versionParameters();
+    List<ContainerTestVersionInfo> layoutList =
+        ContainerTestVersionInfo.getLayoutList();
+    List<Object[]> parameterList = new ArrayList<>();
+    for (ContainerTestVersionInfo containerTestVersionInfo : layoutList) {
+      for (Map.Entry<CopyContainerCompression, String> entry :
+          CopyContainerCompression.getCompressionMapping().entrySet()) {
+        parameterList.add(
+            new Object[]{containerTestVersionInfo, entry.getValue()});
+      }
+    }
+    return parameterList;
   }
 
   @BeforeClass
@@ -174,8 +185,7 @@ public class TestTarContainerPacker {
     //THEN: check the result
     TarArchiveInputStream tarStream = null;
     try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
-      CompressorInputStream uncompressed = new CompressorStreamFactory()
-          .createCompressorInputStream(GZIP, input);
+      InputStream uncompressed = packer.decompress(input);
       tarStream = new TarArchiveInputStream(uncompressed);
 
       TarArchiveEntry entry;
@@ -353,9 +363,8 @@ public class TestTarContainerPacker {
       throws Exception {
     File targetFile = TEMP_DIR.resolve("container.tar.gz").toFile();
     try (FileOutputStream output = new FileOutputStream(targetFile);
-         CompressorOutputStream gzipped = new CompressorStreamFactory()
-             .createCompressorOutputStream(GZIP, output);
-         ArchiveOutputStream archive = new TarArchiveOutputStream(gzipped)) {
+         OutputStream compressed = packer.compress(output);
+         ArchiveOutputStream archive = new TarArchiveOutputStream(compressed)) 
{
       TarContainerPacker.includeFile(file, entryName, archive);
     }
     return targetFile;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 94a83ccc49..ec0d9bc335 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -655,7 +655,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     File destination = tempFolder.newFile();
     try (FileOutputStream fos = new FileOutputStream(destination)) {
-      replicationSource.copyData(containerId, fos);
+      replicationSource.copyData(containerId, fos, "NO_COMPRESSION");
     }
     return destination;
   }
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index daa47b7a17..1b7fcad140 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -477,11 +477,20 @@ message GetSmallFileResponseProto {
   required ReadChunkResponseProto data = 1;
 }
 
+enum CopyContainerCompressProto {
+  NO_COMPRESSION = 1;
+  GZIP = 2;
+  LZ4 = 3;
+  SNAPPY = 4;
+  ZSTD = 5;
+}
+
 message CopyContainerRequestProto {
   required int64 containerID = 1;
   required uint64 readOffset = 2;
   optional uint64 len = 3;
   optional uint32 version = 4;
+  optional CopyContainerCompressProto compression = 5;
 }
 
 message CopyContainerResponseProto {
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt 
b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index b5ecdd1d8f..828dc7e27a 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -479,6 +479,7 @@ BSD 3-Clause
 BSD 2-Clause
 =====================
 
+   com.github.luben:zstd-jni
    dnsjava:dnsjava
    org.codehaus.woodstox:stax2-api
 
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt 
b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 0e900808c1..c2561d8899 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -265,3 +265,4 @@ share/ozone/lib/token-provider.jar
 share/ozone/lib/txw2.jar
 share/ozone/lib/weld-servlet.Final.jar
 share/ozone/lib/woodstox-core.jar
+share/ozone/lib/zstd-jni.jar
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
index 0a00959f7e..e2f0a25569 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ExportSubcommand.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.concurrent.Callable;
 
+import static 
org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
 
 /**
@@ -61,6 +62,7 @@ public class ExportSubcommand implements Callable<Void> {
       description = "Count of containers to export")
   private long containerCount = 1;
 
+
   @Override
   public Void call() throws Exception {
     parent.loadContainersFromVolumes();
@@ -74,7 +76,7 @@ public class ExportSubcommand implements Callable<Void> {
           new File(destination, "container-" + containerId + ".tar.gz");
       try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
         try {
-          replicationSource.copyData(containerId, fos);
+          replicationSource.copyData(containerId, fos, GZIP);
         } catch (StorageContainerException e) {
           if (e.getResult() == CONTAINER_NOT_FOUND) {
             continue;
diff --git a/pom.xml b/pom.xml
index fc38302dfc..beacc6c2fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xs
     <commons-collections.version>3.2.2</commons-collections.version>
     <commons-compress.version>1.21</commons-compress.version>
     <commons-configuration2.version>2.1.1</commons-configuration2.version>
+    <zstd-jni.version>1.5.2-5</zstd-jni.version>
     <commons-daemon.version>1.0.13</commons-daemon.version>
     <commons-io.version>2.11.0</commons-io.version>
     <commons-lang3.version>3.7</commons-lang3.version>
@@ -762,6 +763,11 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xs
         <artifactId>commons-validator</artifactId>
         <version>${commons-validator.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.github.luben</groupId>
+        <artifactId>zstd-jni</artifactId>
+        <version>${zstd-jni.version}</version>
+      </dependency>
       <dependency>
         <groupId>javax.activation</groupId>
         <artifactId>activation</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to