This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 223f95cde8c Pipe: add compression level config for connector ZSTD 
compressor (#12630)
223f95cde8c is described below

commit 223f95cde8cdb1bb34c715e1687dc2e15974df45
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Jun 6 16:36:51 2024 +0800

    Pipe: add compression level config for connector ZSTD compressor (#12630)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../IoTDBPipeConnectorCompressionIT.java           | 126 +++++++++++++++++++++
 iotdb-core/node-commons/pom.xml                    |   4 +
 .../config/constant/PipeConnectorConstant.java     |  10 ++
 ...TDCompressor.java => PipeCompressorConfig.java} |  28 ++---
 .../compressor/PipeCompressorFactory.java          |  62 +++++++---
 .../connector/compressor/PipeZSTDCompressor.java   |  17 ++-
 .../pipe/connector/protocol/IoTDBConnector.java    |  25 +++-
 7 files changed, 230 insertions(+), 42 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
index be4aa458d9f..f7c0c63b842 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.pipe.it.autocreate;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -36,11 +38,17 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.fail;
+
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
@@ -179,4 +187,122 @@ public class IoTDBPipeConnectorCompressionIT extends 
AbstractPipeDualAutoIT {
           Collections.singleton("8,"));
     }
   }
+
+  @Test
+  public void testZstdCompressorLevel() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList(
+              "insert into root.db.d1(time, s1) values (1, 1)",
+              "insert into root.db.d1(time, s2) values (1, 1)",
+              "insert into root.db.d1(time, s3) values (1, 1)",
+              "insert into root.db.d1(time, s4) values (1, 1)",
+              "insert into root.db.d1(time, s5) values (1, 1)",
+              "flush"))) {
+        return;
+      }
+
+      // Create 5 pipes with different zstd compression levels, p4 and p5 
should fail.
+
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format(
+                "create pipe p1"
+                    + " with extractor ('extractor.pattern'='root.db.d1.s1')"
+                    + " with connector ("
+                    + "'connector.ip'='%s',"
+                    + "'connector.port'='%s',"
+                    + "'connector.compressor'='zstd, zstd',"
+                    + "'connector.compressor.zstd.level'='3')",
+                receiverIp, receiverPort));
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format(
+                "create pipe p2"
+                    + " with extractor ('extractor.pattern'='root.db.d1.s2')"
+                    + " with connector ("
+                    + "'connector.ip'='%s',"
+                    + "'connector.port'='%s',"
+                    + "'connector.compressor'='zstd, zstd',"
+                    + "'connector.compressor.zstd.level'='22')",
+                receiverIp, receiverPort));
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format(
+                "create pipe p3"
+                    + " with extractor ('extractor.pattern'='root.db.d1.s3')"
+                    + " with connector ("
+                    + "'connector.ip'='%s',"
+                    + "'connector.port'='%s',"
+                    + "'connector.compressor'='zstd, zstd',"
+                    + "'connector.compressor.zstd.level'='-131072')",
+                receiverIp, receiverPort));
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format(
+                "create pipe p4"
+                    + " with extractor ('extractor.pattern'='root.db.d1.s4')"
+                    + " with connector ("
+                    + "'connector.ip'='%s',"
+                    + "'connector.port'='%s',"
+                    + "'connector.compressor'='zstd, zstd',"
+                    + "'connector.compressor.zstd.level'='-131073')",
+                receiverIp, receiverPort));
+        fail();
+      } catch (SQLException e) {
+        // Make sure the error message in IoTDBConnector.java is returned
+        Assert.assertTrue(e.getMessage().contains("Zstd compression level 
should be in the range"));
+      }
+
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format(
+                "create pipe p5"
+                    + " with extractor ('extractor.pattern'='root.db.d1.s5')"
+                    + " with connector ("
+                    + "'connector.ip'='%s',"
+                    + "'connector.port'='%s',"
+                    + "'connector.compressor'='zstd, zstd',"
+                    + "'connector.compressor.zstd.level'='23')",
+                receiverIp, receiverPort));
+        fail();
+      } catch (SQLException e) {
+        // Make sure the error message in IoTDBConnector.java is returned
+        Assert.assertTrue(e.getMessage().contains("Zstd compression level 
should be in the range"));
+      }
+
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(3, showPipeResult.size());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          receiverEnv, "count timeseries", "count(timeseries),", 
Collections.singleton("3,"));
+    }
+  }
 }
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 8879e7ebedc..421f791face 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -155,6 +155,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-jexl3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 3d0bd4cab74..8d6879ae8c2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.commons.pipe.config.constant;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
+import com.github.luben.zstd.Zstd;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
@@ -177,6 +179,14 @@ public class PipeConnectorConstant {
                   CONNECTOR_COMPRESSOR_ZSTD,
                   CONNECTOR_COMPRESSOR_LZMA2)));
 
+  public static final String CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY =
+      "connector.compressor.zstd.level";
+  public static final String SINK_COMPRESSOR_ZSTD_LEVEL_KEY = 
"sink.compressor.zstd.level";
+  public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE =
+      Zstd.defaultCompressionLevel();
+  public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE = 
Zstd.minCompressionLevel();
+  public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE = 
Zstd.maxCompressionLevel();
+
   public static final String CONNECTOR_RATE_LIMIT_KEY = 
"connector.rate-limit-bytes-per-second";
   public static final String SINK_RATE_LIMIT_KEY = 
"sink.rate-limit-bytes-per-second";
   public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
similarity index 54%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
index 72782353d68..c13028c4d31 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
@@ -19,29 +19,21 @@
 
 package org.apache.iotdb.commons.pipe.connector.compressor;
 
-import org.apache.tsfile.compress.ICompressor;
-import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
+public class PipeCompressorConfig {
 
-import java.io.IOException;
+  private final String name;
+  private final int zstdCompressionLevel;
 
-public class PipeZSTDCompressor extends PipeCompressor {
-
-  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.ZSTD);
-  private static final IUnCompressor DECOMPRESSOR =
-      IUnCompressor.getUnCompressor(CompressionType.ZSTD);
-
-  public PipeZSTDCompressor() {
-    super(PipeCompressionType.ZSTD);
+  public PipeCompressorConfig(String name, int zstdCompressionLevel) {
+    this.name = name;
+    this.zstdCompressionLevel = zstdCompressionLevel;
   }
 
-  @Override
-  public byte[] compress(byte[] data) throws IOException {
-    return COMPRESSOR.compress(data);
+  public String getName() {
+    return name;
   }
 
-  @Override
-  public byte[] decompress(byte[] byteArray) throws IOException {
-    return DECOMPRESSOR.uncompress(byteArray);
+  public int getZstdCompressionLevel() {
+    return zstdCompressionLevel;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
index 14ae972fc4f..bcee9b85aeb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
@@ -19,50 +19,86 @@
 
 package org.apache.iotdb.commons.pipe.connector.compressor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZMA2;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SNAPPY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE;
 
 public class PipeCompressorFactory {
 
-  private static Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE = new 
HashMap<>();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeCompressorFactory.class);
+
+  private static final Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE 
=
+      new ConcurrentHashMap<>();
 
   static {
     COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_SNAPPY, new 
PipeSnappyCompressor());
     COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_GZIP, new 
PipeGZIPCompressor());
     COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZ4, new 
PipeLZ4Compressor());
-    COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_ZSTD, new 
PipeZSTDCompressor());
+    COMPRESSOR_NAME_TO_INSTANCE.put(
+        CONNECTOR_COMPRESSOR_ZSTD,
+        new PipeZSTDCompressor(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE));
     COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZMA2, new 
PipeLZMA2Compressor());
-    COMPRESSOR_NAME_TO_INSTANCE = 
Collections.unmodifiableMap(COMPRESSOR_NAME_TO_INSTANCE);
   }
 
-  public static PipeCompressor getCompressor(String name) {
-    final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(name);
-    if (compressor == null) {
-      throw new UnsupportedOperationException("PipeCompressor not found for 
name: " + name);
+  public static PipeCompressor getCompressor(PipeCompressorConfig config) {
+    if (config == null) {
+      throw new IllegalArgumentException("PipeCompressorConfig is null");
     }
-    return compressor;
+    if (config.getName() == null) {
+      throw new IllegalArgumentException("PipeCompressorConfig.getName() is 
null");
+    }
+
+    final String compressorName = config.getName();
+
+    // For ZSTD compressor, we need to consider the compression level
+    if (compressorName.equals(CONNECTOR_COMPRESSOR_ZSTD)) {
+      final int zstdCompressionLevel = config.getZstdCompressionLevel();
+      return COMPRESSOR_NAME_TO_INSTANCE.computeIfAbsent(
+          CONNECTOR_COMPRESSOR_ZSTD + "_" + zstdCompressionLevel,
+          key -> {
+            LOGGER.info("Create new PipeZSTDCompressor with level: {}", 
zstdCompressionLevel);
+            return new PipeZSTDCompressor(zstdCompressionLevel);
+          });
+    }
+
+    // For other compressors, we can directly get the instance by name
+    final PipeCompressor compressor = 
COMPRESSOR_NAME_TO_INSTANCE.get(compressorName);
+    if (compressor != null) {
+      return compressor;
+    }
+
+    throw new UnsupportedOperationException("PipeCompressor not found for 
name: " + compressorName);
   }
 
   private static Map<Byte, PipeCompressor> COMPRESSOR_INDEX_TO_INSTANCE = new 
HashMap<>();
 
   static {
     COMPRESSOR_INDEX_TO_INSTANCE.put(
-        PipeCompressor.PipeCompressionType.SNAPPY.getIndex(), new 
PipeSnappyCompressor());
+        PipeCompressor.PipeCompressionType.SNAPPY.getIndex(),
+        COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_SNAPPY));
     COMPRESSOR_INDEX_TO_INSTANCE.put(
-        PipeCompressor.PipeCompressionType.GZIP.getIndex(), new 
PipeGZIPCompressor());
+        PipeCompressor.PipeCompressionType.GZIP.getIndex(),
+        COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_GZIP));
     COMPRESSOR_INDEX_TO_INSTANCE.put(
-        PipeCompressor.PipeCompressionType.LZ4.getIndex(), new 
PipeLZ4Compressor());
+        PipeCompressor.PipeCompressionType.LZ4.getIndex(),
+        COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZ4));
     COMPRESSOR_INDEX_TO_INSTANCE.put(
-        PipeCompressor.PipeCompressionType.ZSTD.getIndex(), new 
PipeZSTDCompressor());
+        PipeCompressor.PipeCompressionType.ZSTD.getIndex(),
+        COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_ZSTD));
     COMPRESSOR_INDEX_TO_INSTANCE.put(
-        PipeCompressor.PipeCompressionType.LZMA2.getIndex(), new 
PipeLZMA2Compressor());
+        PipeCompressor.PipeCompressionType.LZMA2.getIndex(),
+        COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZMA2));
     COMPRESSOR_INDEX_TO_INSTANCE = 
Collections.unmodifiableMap(COMPRESSOR_INDEX_TO_INSTANCE);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
index 72782353d68..50e2e1f845c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -19,29 +19,26 @@
 
 package org.apache.iotdb.commons.pipe.connector.compressor;
 
-import org.apache.tsfile.compress.ICompressor;
-import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
+import com.github.luben.zstd.Zstd;
 
 import java.io.IOException;
 
 public class PipeZSTDCompressor extends PipeCompressor {
 
-  private static final ICompressor COMPRESSOR = 
ICompressor.getCompressor(CompressionType.ZSTD);
-  private static final IUnCompressor DECOMPRESSOR =
-      IUnCompressor.getUnCompressor(CompressionType.ZSTD);
+  private final int compressionLevel;
 
-  public PipeZSTDCompressor() {
+  public PipeZSTDCompressor(int compressionLevel) {
     super(PipeCompressionType.ZSTD);
+    this.compressionLevel = compressionLevel;
   }
 
   @Override
   public byte[] compress(byte[] data) throws IOException {
-    return COMPRESSOR.compress(data);
+    return Zstd.compress(data, compressionLevel);
   }
 
   @Override
-  public byte[] decompress(byte[] byteArray) throws IOException {
-    return DECOMPRESSOR.uncompress(byteArray);
+  public byte[] decompress(byte[] byteArray) {
+    return Zstd.decompress(byteArray, (int) Zstd.decompressedSize(byteArray, 
0, byteArray.length));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index f65daf86d4f..ad8e328e45e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.connector.protocol;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
 import 
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
 import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
 import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
@@ -50,6 +51,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SET;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
@@ -72,6 +77,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_ZSTD_LEVEL_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -151,6 +157,21 @@ public abstract class IoTDBConnector implements 
PipeConnector {
             CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
         loadBalanceStrategy);
 
+    final int zstdCompressionLevel =
+        parameters.getIntOrDefault(
+            Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY, 
SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
+            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE);
+    validator.validate(
+        arg ->
+            (int) arg >= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE
+                && (int) arg <= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
+        String.format(
+            "Zstd compression level should be in the range [%d, %d], but got 
%d.",
+            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE,
+            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
+            zstdCompressionLevel),
+        zstdCompressionLevel);
+
     final String compressionTypes =
         parameters
             .getStringOrDefault(
@@ -170,7 +191,9 @@ public abstract class IoTDBConnector implements 
PipeConnector {
                 "Compressor should be one of %s, but got %s.",
                 CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
             trimmedCompressionType);
-        
compressors.add(PipeCompressorFactory.getCompressor(trimmedCompressionType));
+        compressors.add(
+            PipeCompressorFactory.getCompressor(
+                new PipeCompressorConfig(trimmedCompressionType, 
zstdCompressionLevel)));
       }
     }
     validator.validate(

Reply via email to