This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 223f95cde8c Pipe: add compression level config for connector ZSTD
compressor (#12630)
223f95cde8c is described below
commit 223f95cde8cdb1bb34c715e1687dc2e15974df45
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Jun 6 16:36:51 2024 +0800
Pipe: add compression level config for connector ZSTD compressor (#12630)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../IoTDBPipeConnectorCompressionIT.java | 126 +++++++++++++++++++++
iotdb-core/node-commons/pom.xml | 4 +
.../config/constant/PipeConnectorConstant.java | 10 ++
...TDCompressor.java => PipeCompressorConfig.java} | 28 ++---
.../compressor/PipeCompressorFactory.java | 62 +++++++---
.../connector/compressor/PipeZSTDCompressor.java | 17 ++-
.../pipe/connector/protocol/IoTDBConnector.java | 25 +++-
7 files changed, 230 insertions(+), 42 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
index be4aa458d9f..f7c0c63b842 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConnectorCompressionIT.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.pipe.it.autocreate;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
@@ -36,11 +38,17 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.junit.Assert.fail;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeConnectorCompressionIT extends AbstractPipeDualAutoIT {
@@ -179,4 +187,122 @@ public class IoTDBPipeConnectorCompressionIT extends
AbstractPipeDualAutoIT {
Collections.singleton("8,"));
}
}
+
+ @Test
+ public void testZstdCompressorLevel() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1(time, s1) values (1, 1)",
+ "insert into root.db.d1(time, s2) values (1, 1)",
+ "insert into root.db.d1(time, s3) values (1, 1)",
+ "insert into root.db.d1(time, s4) values (1, 1)",
+ "insert into root.db.d1(time, s5) values (1, 1)",
+ "flush"))) {
+ return;
+ }
+
+ // Create 5 pipes with different zstd compression levels, p4 and p5
should fail.
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p1"
+ + " with extractor ('extractor.pattern'='root.db.d1.s1')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='3')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p2"
+ + " with extractor ('extractor.pattern'='root.db.d1.s2')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='22')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p3"
+ + " with extractor ('extractor.pattern'='root.db.d1.s3')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='-131072')",
+ receiverIp, receiverPort));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p4"
+ + " with extractor ('extractor.pattern'='root.db.d1.s4')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='-131073')",
+ receiverIp, receiverPort));
+ fail();
+ } catch (SQLException e) {
+ // Make sure the error message in IoTDBConnector.java is returned
+ Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
+ }
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe p5"
+ + " with extractor ('extractor.pattern'='root.db.d1.s5')"
+ + " with connector ("
+ + "'connector.ip'='%s',"
+ + "'connector.port'='%s',"
+ + "'connector.compressor'='zstd, zstd',"
+ + "'connector.compressor.zstd.level'='23')",
+ receiverIp, receiverPort));
+ fail();
+ } catch (SQLException e) {
+ // Make sure the error message in IoTDBConnector.java is returned
+ Assert.assertTrue(e.getMessage().contains("Zstd compression level
should be in the range"));
+ }
+
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(3, showPipeResult.size());
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "count timeseries", "count(timeseries),",
Collections.singleton("3,"));
+ }
+ }
}
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 8879e7ebedc..421f791face 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -155,6 +155,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 3d0bd4cab74..8d6879ae8c2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.commons.pipe.config.constant;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import com.github.luben.zstd.Zstd;
+
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
@@ -177,6 +179,14 @@ public class PipeConnectorConstant {
CONNECTOR_COMPRESSOR_ZSTD,
CONNECTOR_COMPRESSOR_LZMA2)));
+ public static final String CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY =
+ "connector.compressor.zstd.level";
+ public static final String SINK_COMPRESSOR_ZSTD_LEVEL_KEY =
"sink.compressor.zstd.level";
+ public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE =
+ Zstd.defaultCompressionLevel();
+ public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE =
Zstd.minCompressionLevel();
+ public static final int CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE =
Zstd.maxCompressionLevel();
+
public static final String CONNECTOR_RATE_LIMIT_KEY =
"connector.rate-limit-bytes-per-second";
public static final String SINK_RATE_LIMIT_KEY =
"sink.rate-limit-bytes-per-second";
public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
similarity index 54%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
index 72782353d68..c13028c4d31 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorConfig.java
@@ -19,29 +19,21 @@
package org.apache.iotdb.commons.pipe.connector.compressor;
-import org.apache.tsfile.compress.ICompressor;
-import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
+public class PipeCompressorConfig {
-import java.io.IOException;
+ private final String name;
+ private final int zstdCompressionLevel;
-public class PipeZSTDCompressor extends PipeCompressor {
-
- private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.ZSTD);
- private static final IUnCompressor DECOMPRESSOR =
- IUnCompressor.getUnCompressor(CompressionType.ZSTD);
-
- public PipeZSTDCompressor() {
- super(PipeCompressionType.ZSTD);
+ public PipeCompressorConfig(String name, int zstdCompressionLevel) {
+ this.name = name;
+ this.zstdCompressionLevel = zstdCompressionLevel;
}
- @Override
- public byte[] compress(byte[] data) throws IOException {
- return COMPRESSOR.compress(data);
+ public String getName() {
+ return name;
}
- @Override
- public byte[] decompress(byte[] byteArray) throws IOException {
- return DECOMPRESSOR.uncompress(byteArray);
+ public int getZstdCompressionLevel() {
+ return zstdCompressionLevel;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
index 14ae972fc4f..bcee9b85aeb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeCompressorFactory.java
@@ -19,50 +19,86 @@
package org.apache.iotdb.commons.pipe.connector.compressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_GZIP;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZ4;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_LZMA2;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SNAPPY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE;
public class PipeCompressorFactory {
- private static Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE = new
HashMap<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeCompressorFactory.class);
+
+ private static final Map<String, PipeCompressor> COMPRESSOR_NAME_TO_INSTANCE
=
+ new ConcurrentHashMap<>();
static {
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_SNAPPY, new
PipeSnappyCompressor());
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_GZIP, new
PipeGZIPCompressor());
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZ4, new
PipeLZ4Compressor());
- COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_ZSTD, new
PipeZSTDCompressor());
+ COMPRESSOR_NAME_TO_INSTANCE.put(
+ CONNECTOR_COMPRESSOR_ZSTD,
+ new PipeZSTDCompressor(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE));
COMPRESSOR_NAME_TO_INSTANCE.put(CONNECTOR_COMPRESSOR_LZMA2, new
PipeLZMA2Compressor());
- COMPRESSOR_NAME_TO_INSTANCE =
Collections.unmodifiableMap(COMPRESSOR_NAME_TO_INSTANCE);
}
- public static PipeCompressor getCompressor(String name) {
- final PipeCompressor compressor = COMPRESSOR_NAME_TO_INSTANCE.get(name);
- if (compressor == null) {
- throw new UnsupportedOperationException("PipeCompressor not found for
name: " + name);
+ public static PipeCompressor getCompressor(PipeCompressorConfig config) {
+ if (config == null) {
+ throw new IllegalArgumentException("PipeCompressorConfig is null");
}
- return compressor;
+ if (config.getName() == null) {
+ throw new IllegalArgumentException("PipeCompressorConfig.getName() is
null");
+ }
+
+ final String compressorName = config.getName();
+
+ // For ZSTD compressor, we need to consider the compression level
+ if (compressorName.equals(CONNECTOR_COMPRESSOR_ZSTD)) {
+ final int zstdCompressionLevel = config.getZstdCompressionLevel();
+ return COMPRESSOR_NAME_TO_INSTANCE.computeIfAbsent(
+ CONNECTOR_COMPRESSOR_ZSTD + "_" + zstdCompressionLevel,
+ key -> {
+ LOGGER.info("Create new PipeZSTDCompressor with level: {}",
zstdCompressionLevel);
+ return new PipeZSTDCompressor(zstdCompressionLevel);
+ });
+ }
+
+ // For other compressors, we can directly get the instance by name
+ final PipeCompressor compressor =
COMPRESSOR_NAME_TO_INSTANCE.get(compressorName);
+ if (compressor != null) {
+ return compressor;
+ }
+
+ throw new UnsupportedOperationException("PipeCompressor not found for
name: " + compressorName);
}
private static Map<Byte, PipeCompressor> COMPRESSOR_INDEX_TO_INSTANCE = new
HashMap<>();
static {
COMPRESSOR_INDEX_TO_INSTANCE.put(
- PipeCompressor.PipeCompressionType.SNAPPY.getIndex(), new
PipeSnappyCompressor());
+ PipeCompressor.PipeCompressionType.SNAPPY.getIndex(),
+ COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_SNAPPY));
COMPRESSOR_INDEX_TO_INSTANCE.put(
- PipeCompressor.PipeCompressionType.GZIP.getIndex(), new
PipeGZIPCompressor());
+ PipeCompressor.PipeCompressionType.GZIP.getIndex(),
+ COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_GZIP));
COMPRESSOR_INDEX_TO_INSTANCE.put(
- PipeCompressor.PipeCompressionType.LZ4.getIndex(), new
PipeLZ4Compressor());
+ PipeCompressor.PipeCompressionType.LZ4.getIndex(),
+ COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZ4));
COMPRESSOR_INDEX_TO_INSTANCE.put(
- PipeCompressor.PipeCompressionType.ZSTD.getIndex(), new
PipeZSTDCompressor());
+ PipeCompressor.PipeCompressionType.ZSTD.getIndex(),
+ COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_ZSTD));
COMPRESSOR_INDEX_TO_INSTANCE.put(
- PipeCompressor.PipeCompressionType.LZMA2.getIndex(), new
PipeLZMA2Compressor());
+ PipeCompressor.PipeCompressionType.LZMA2.getIndex(),
+ COMPRESSOR_NAME_TO_INSTANCE.get(CONNECTOR_COMPRESSOR_LZMA2));
COMPRESSOR_INDEX_TO_INSTANCE =
Collections.unmodifiableMap(COMPRESSOR_INDEX_TO_INSTANCE);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
index 72782353d68..50e2e1f845c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/compressor/PipeZSTDCompressor.java
@@ -19,29 +19,26 @@
package org.apache.iotdb.commons.pipe.connector.compressor;
-import org.apache.tsfile.compress.ICompressor;
-import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
+import com.github.luben.zstd.Zstd;
import java.io.IOException;
public class PipeZSTDCompressor extends PipeCompressor {
- private static final ICompressor COMPRESSOR =
ICompressor.getCompressor(CompressionType.ZSTD);
- private static final IUnCompressor DECOMPRESSOR =
- IUnCompressor.getUnCompressor(CompressionType.ZSTD);
+ private final int compressionLevel;
- public PipeZSTDCompressor() {
+ public PipeZSTDCompressor(int compressionLevel) {
super(PipeCompressionType.ZSTD);
+ this.compressionLevel = compressionLevel;
}
@Override
public byte[] compress(byte[] data) throws IOException {
- return COMPRESSOR.compress(data);
+ return Zstd.compress(data, compressionLevel);
}
@Override
- public byte[] decompress(byte[] byteArray) throws IOException {
- return DECOMPRESSOR.uncompress(byteArray);
+ public byte[] decompress(byte[] byteArray) {
+ return Zstd.decompress(byteArray, (int) Zstd.decompressedSize(byteArray,
0, byteArray.length));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index f65daf86d4f..ad8e328e45e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.pipe.connector.protocol;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
+import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorConfig;
import
org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
@@ -50,6 +51,10 @@ import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_SET;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE;
@@ -72,6 +77,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_ZSTD_LEVEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY;
@@ -151,6 +157,21 @@ public abstract class IoTDBConnector implements
PipeConnector {
CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
loadBalanceStrategy);
+ final int zstdCompressionLevel =
+ parameters.getIntOrDefault(
+ Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY,
SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
+ CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE);
+ validator.validate(
+ arg ->
+ (int) arg >= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE
+ && (int) arg <= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
+ String.format(
+ "Zstd compression level should be in the range [%d, %d], but got
%d.",
+ CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE,
+ CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
+ zstdCompressionLevel),
+ zstdCompressionLevel);
+
final String compressionTypes =
parameters
.getStringOrDefault(
@@ -170,7 +191,9 @@ public abstract class IoTDBConnector implements
PipeConnector {
"Compressor should be one of %s, but got %s.",
CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
trimmedCompressionType);
-
compressors.add(PipeCompressorFactory.getCompressor(trimmedCompressionType));
+ compressors.add(
+ PipeCompressorFactory.getCompressor(
+ new PipeCompressorConfig(trimmedCompressionType,
zstdCompressionLevel)));
}
}
validator.validate(