This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ef2c3c7283 [Improve][Connector-V2] Redis support custom key and value
(#7888)
ef2c3c7283 is described below
commit ef2c3c7283e7c1ff7ce445b71a62977ac5ed0403
Author: limin <[email protected]>
AuthorDate: Tue Oct 29 21:59:36 2024 +0800
[Improve][Connector-V2] Redis support custom key and value (#7888)
Co-authored-by: limin <[email protected]>
---
docs/en/connector-v2/sink/Redis.md | 134 +++++++++++++++++---
docs/zh/connector-v2/sink/Redis.md | 137 ++++++++++++++++++---
.../seatunnel/redis/config/RedisConfig.java | 26 ++++
.../seatunnel/redis/config/RedisParameters.java | 20 +++
.../seatunnel/redis/sink/RedisSinkFactory.java | 4 +
.../seatunnel/redis/sink/RedisSinkWriter.java | 113 +++++++++++++++--
.../connector/redis/RedisTestCaseTemplateIT.java | 85 +++++++++++++
.../redis-to-redis-custom-hash-key-and-value.conf | 119 ++++++++++++++++++
.../test/resources/redis-to-redis-custom-key.conf | 118 ++++++++++++++++++
.../redis-to-redis-custom-value-for-key.conf | 119 ++++++++++++++++++
.../redis-to-redis-custom-value-for-list.conf | 118 ++++++++++++++++++
.../redis-to-redis-custom-value-for-set.conf | 118 ++++++++++++++++++
.../redis-to-redis-custom-value-for-zset.conf | 118 ++++++++++++++++++
13 files changed, 1182 insertions(+), 47 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index b5f444bb11..5b37720891 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -12,21 +12,25 @@ Used to write data to Redis.
## Options
-| name | type | required | default value |
-|----------------|--------|-----------------------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| key | string | yes | - |
-| data_type | string | yes | - |
-| batch_size | int | no | 10 |
-| user | string | no | - |
-| auth | string | no | - |
-| db_num | int | no | 0 |
-| mode | string | no | single |
-| nodes | list | yes when mode=cluster | - |
-| format | string | no | json |
-| expire | long | no | -1 |
-| common-options | | no | - |
+| name | type | required | default value |
+|--------------------|---------|-----------------------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| key | string | yes | - |
+| data_type | string | yes | - |
+| batch_size | int | no | 10 |
+| user | string | no | - |
+| auth | string | no | - |
+| db_num | int | no | 0 |
+| mode | string | no | single |
+| nodes | list | yes when mode=cluster | - |
+| format | string | no | json |
+| expire | long | no | -1 |
+| support_custom_key | boolean | no | false |
+| value_field | string | no | - |
+| hash_key_field | string | no | - |
+| hash_value_field | string | no | - |
+| common-options | | no | - |
### host [string]
@@ -50,12 +54,12 @@ Upstream data is the following:
| 500 | internal error | false |
If you assign field name to `code` and data_type to `key`, two data will be
written to redis:
-1. `200 -> {code: 200, message: true, data: get success}`
-2. `500 -> {code: 500, message: false, data: internal error}`
+1. `200 -> {code: 200, data: get success, success: true}`
+2. `500 -> {code: 500, data: internal error, success: false}`
If you assign field name to `value` and data_type to `key`, only one data will
be written to redis because `value` is not existed in upstream data's fields:
-1. `value -> {code: 500, message: false, data: internal error}`
+1. `value -> {code: 500, data: internal error, success: false}`
Please see the data_type section for specific writing rules.
@@ -85,7 +89,7 @@ Redis data types, support `key` `hash` `list` `set` `zset`
> Each data from upstream will be added to the configured zset key with a
> weight of 1. So the order of data in zset is based on the order of data
> consumption.
>
- ### batch_size [int]
+### batch_size [int]
ensure the batch write size in single-machine mode; no guarantees in cluster
mode.
@@ -135,6 +139,61 @@ Connector will generate data as the following and write it
to redis:
Set redis expiration time, the unit is second. The default value is -1, keys
do not automatically expire by default.
+### support_custom_key [boolean]
+
+if true, the key can be customized by the field value in the upstream data.
+
+Upstream data is the following:
+
+| code | data | success |
+|------|----------------|---------|
+| 200 | get success | true |
+| 500 | internal error | false |
+
+You can customize the Redis key using '{' and '}', and the field name in '{}'
will be parsed and replaced by the field value in the upstream data. For
example, If you assign field name to `{code}` and data_type to `key`, two data
will be written to redis:
+1. `200 -> {code: 200, data: get success, success: true}`
+2. `500 -> {code: 500, data: internal error, success: false}`
+
+Redis key can be composed of fixed and variable parts, connected by ':'. For
example, If you assign field name to `code:{code}` and data_type to `key`, two
data will be written to redis:
+1. `code:200 -> {code: 200, data: get success, success: true}`
+2. `code:500 -> {code: 500, data: internal error, success: false}`
+
+### value_field [string]
+
+The field of value you want to write to redis, `data_type` support `key`
`list` `set` `zset`.
+
+When you assign field name to `value` and value_field is `data` and data_type
to `key`, for example:
+
+Upstream data is the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+The following data will be written to redis:
+1. `value -> get success`
+
+### hash_key_field [string]
+
+The field of hash key you want to write to redis, `data_type` support `hash`
+
+### hash_value_field [string]
+
+The field of hash value you want to write to redis, `data_type` support `hash`
+
+When you assign field name to `value` and hash_key_field is `data` and
hash_value_field is `success` and data_type to `hash`, for example:
+
+Upstream data is the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+Connector will generate data as the following and write it to redis:
+
+The following data will be written to redis:
+1. `value -> get success | true`
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
@@ -152,6 +211,43 @@ Redis {
}
```
+custom key:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = "name:{name}"
+ support_custom_key = true
+ data_type = key
+}
+```
+
+custom value:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = person
+ value_field = "name"
+ data_type = key
+}
+```
+
+custom HashKey and HashValue:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = person
+ hash_key_field = "name"
+ hash_value_field = "age"
+ data_type = hash
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git a/docs/zh/connector-v2/sink/Redis.md
b/docs/zh/connector-v2/sink/Redis.md
index b47d9de914..5640710de4 100644
--- a/docs/zh/connector-v2/sink/Redis.md
+++ b/docs/zh/connector-v2/sink/Redis.md
@@ -12,20 +12,25 @@
## 选项
-| 名称 | 类型 | 是否必须 | 默认值 |
-|----------------|--------|---------------------|--------|
-| host | string | 是 | - |
-| port | int | 是 | - |
-| key | string | 是 | - |
-| data_type | string | 是 | - |
-| user | string | 否 | - |
-| auth | string | 否 | - |
-| db_num | int | 否 | 0 |
-| mode | string | 否 | single |
-| nodes | list | 当 mode=cluster 时为:是 | - |
-| format | string | 否 | json |
-| expire | long | 否 | -1 |
-| common-options | | 否 | - |
+| name | type | required | default value |
+|--------------------|---------|-----------------------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| key | string | yes | - |
+| data_type | string | yes | - |
+| batch_size | int | no | 10 |
+| user | string | no | - |
+| auth | string | no | - |
+| db_num | int | no | 0 |
+| mode | string | no | single |
+| nodes | list | yes when mode=cluster | - |
+| format | string | no | json |
+| expire | long | no | -1 |
+| support_custom_key | boolean | no | false |
+| value_field | string | no | - |
+| hash_key_field | string | no | - |
+| hash_value_field | string | no | - |
+| common-options | | no | - |
### host [string]
@@ -48,13 +53,17 @@ Redis 端口
| 200 | 获取成功 | true |
| 500 | 内部错误 | false |
-如果将字段名称指定为 `code` 并将 data_type 设置为 `key`,将有两个数据写入 Redis:
-1. `200 -> {code: 200, message: true, data: 获取成功}`
-2. `500 -> {code: 500, message: false, data: 内部错误}`
+可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将
data_type 设置为 `key`,将有两个数据写入 Redis:
+1. `200 -> {code: 200, data: 获取成功, success: true}`
+2. `500 -> {code: 500, data: 内部错误, success: false}`
-如果将字段名称指定为 `value` 并将 data_type 设置为 `key`,则由于上游数据的字段中没有 `value` 字段,将只有一个数据写入
Redis:
+Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type
设置为 `key`,将有两个数据写入 Redis:
+1. `code:200 -> {code: 200, data: 获取成功, success: true}`
+2. `code:500 -> {code: 500, data: 内部错误, success: false}`
-1. `value -> {code: 500, message: false, data: 内部错误}`
+如果将Redis键名指定为 `value` 并将 data_type 设置为 `key`,则只有一个数据写入 Redis:
+
+1. `value -> {code: 500, data: 内部错误, success: false}`
请参见 data_type 部分以了解具体的写入规则。
@@ -128,6 +137,59 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
设置 Redis 的过期时间,单位为秒。默认值为 -1,表示键不会自动过期。
+### support_custom_key [boolean]
+
+设置为true,表示启用自定义Key。
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200 | 获取成功 | true |
+| 500 | 内部错误 | false |
+
+可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将
data_type 设置为 `key`,将有两个数据写入 Redis:
+1. `200 -> {code: 200, data: 获取成功, success: true}`
+2. `500 -> {code: 500, data: 内部错误, success: false}`
+
+Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type
设置为 `key`,将有两个数据写入 Redis:
+1. `code:200 -> {code: 200, data: 获取成功, success: true}`
+2. `code:500 -> {code: 500, data: 内部错误, success: false}`
+
+### value_field [string]
+
+要写入Redis的值的字段, `data_type` 支持 `key` `list` `set` `zset`.
+
+当你指定Redis键名字段`key`指定为 `value`,值字段`value_field`指定为`data`,并将`data_type`指定为`key`时,
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200 | 获取成功 | true |
+
+如下的数据会被写入Redis:
+1. `value -> 获取成功`
+
+### hash_key_field [string]
+
+要写入Redis的hash键字段, `data_type` 支持 `hash`
+
+### hash_value_field [string]
+
+要写入Redis的hash值字段, `data_type` 支持 `hash`
+
+当你指定Redis键名字段`key`指定为
`value`,hash键字段`hash_key_field`指定为`data`,hash值字段`hash_value_field`指定为`success`,并将`data_type`指定为`hash`时,
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200 | 获取成功 | true |
+
+如下的数据会被写入Redis:
+1. `value -> 获取成功 | true`
+
### common options
Sink 插件通用参数,请参考 [Sink Common Options](../sink-common-options.md) 获取详情
@@ -145,6 +207,43 @@ Redis {
}
```
+自定义Key示例:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = "name:{name}"
+ support_custom_key = true
+ data_type = key
+}
+```
+
+自定义Value示例:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = person
+ value_field = "name"
+ data_type = key
+}
+```
+
+自定义HashKey和HashValue示例:
+
+```hocon
+Redis {
+ host = localhost
+ port = 6379
+ key = person
+ hash_key_field = "name"
+ hash_value_field = "age"
+ data_type = hash
+}
+```
+
## 更新日志
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 3be5b39de9..c9809868dc 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -127,6 +127,32 @@ public class RedisConfig {
"batch_size is used to control the size of a batch
of data during read and write operations"
+ ",default 10");
+ public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
+ Options.key("support_custom_key")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "if true, the key can be customized by the field
value in the upstream data.");
+
+ public static final Option<String> VALUE_FIELD =
+ Options.key("value_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The field of value you want to write to redis,
support string list set zset");
+
+ public static final Option<String> HASH_KEY_FIELD =
+ Options.key("hash_key_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field of hash key you want to write
to redis");
+
+ public static final Option<String> HASH_VALUE_FIELD =
+ Options.key("hash_value_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The field of hash value you want to
write to redis");
+
public enum Format {
JSON,
// TEXT will be supported later
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 3d7e954f1d..6dff3cba71 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -57,6 +57,10 @@ public class RedisParameters implements Serializable {
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
+ private Boolean supportCustomKey;
+ private String valueField;
+ private String hashKeyField;
+ private String hashValueField;
private int redisVersion;
@@ -97,6 +101,22 @@ public class RedisParameters implements Serializable {
this.redisDataType = config.get(RedisConfig.DATA_TYPE);
// Indicates the number of keys to attempt to return per
iteration.default 10
this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+ // set support custom key
+ if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) {
+ this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY);
+ }
+ // set value field
+ if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
+ this.valueField = config.get(RedisConfig.VALUE_FIELD);
+ }
+ // set hash key field
+ if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
+ this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
+ }
+ // set hash value field
+ if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
+ this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
+ }
}
public RedisClient buildRedisClient() {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 49c2644d70..38098a2560 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -53,6 +53,10 @@ public class RedisSinkFactory implements TableSinkFactory {
RedisConfig.KEY_PATTERN,
RedisConfig.FORMAT,
RedisConfig.EXPIRE,
+ RedisConfig.SUPPORT_CUSTOM_KEY,
+ RedisConfig.VALUE_FIELD,
+ RedisConfig.HASH_KEY_FIELD,
+ RedisConfig.HASH_VALUE_FIELD,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
.build();
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index f03c5c48c8..9d5c73df2c 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
@@ -29,13 +30,20 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
+ private static final String REDIS_GROUP_DELIMITER = ":";
+ private static final String LEFT_PLACEHOLDER_MARKER = "{";
+ private static final String RIGHT_PLACEHOLDER_MARKER = "}";
private final SeaTunnelRowType seaTunnelRowType;
private final RedisParameters redisParameters;
private final SerializationSchema serializationSchema;
@@ -60,23 +68,110 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
@Override
public void write(SeaTunnelRow element) throws IOException {
- String data = new String(serializationSchema.serialize(element));
- String keyField = redisParameters.getKeyField();
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
- String key;
- if (fields.contains(keyField)) {
- key = element.getField(fields.indexOf(keyField)).toString();
- } else {
- key = keyField;
- }
+ String key = getKey(element, fields);
keyBuffer.add(key);
- valueBuffer.add(data);
+ String value = getValue(element, fields);
+ valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
doBatchWrite();
clearBuffer();
}
}
+ private String getKey(SeaTunnelRow element, List<String> fields) {
+ String key = redisParameters.getKeyField();
+ Boolean supportCustomKey = redisParameters.getSupportCustomKey();
+ if (Boolean.TRUE.equals(supportCustomKey)) {
+ return getCustomKey(element, fields, key);
+ }
+ return getNormalKey(element, fields, key);
+ }
+
+ private static String getNormalKey(SeaTunnelRow element, List<String>
fields, String keyField) {
+ if (fields.contains(keyField)) {
+ return element.getField(fields.indexOf(keyField)).toString();
+ } else {
+ return keyField;
+ }
+ }
+
+ private String getCustomKey(SeaTunnelRow element, List<String> fields,
String keyField) {
+ String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
+ StringBuilder key = new StringBuilder();
+ for (int i = 0; i < keyFieldSegments.length; i++) {
+ String keyFieldSegment = keyFieldSegments[i];
+ if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
+ && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
+ String realKeyField = keyFieldSegment.substring(1,
keyFieldSegment.length() - 1);
+ if (fields.contains(realKeyField)) {
+
key.append(element.getField(fields.indexOf(realKeyField)).toString());
+ } else {
+ key.append(keyFieldSegment);
+ }
+ } else {
+ key.append(keyFieldSegment);
+ }
+ if (i != keyFieldSegments.length - 1) {
+ key.append(REDIS_GROUP_DELIMITER);
+ }
+ }
+ return key.toString();
+ }
+
+ private String getValue(SeaTunnelRow element, List<String> fields) {
+ String value;
+ RedisDataType redisDataType = redisParameters.getRedisDataType();
+ if (RedisDataType.HASH.equals(redisDataType)) {
+ value = handleHashType(element, fields);
+ } else {
+ value = handleOtherTypes(element, fields);
+ }
+ if (value == null) {
+ byte[] serialize = serializationSchema.serialize(element);
+ value = new String(serialize);
+ }
+ return value;
+ }
+
+ private String handleHashType(SeaTunnelRow element, List<String> fields) {
+ String hashKeyField = redisParameters.getHashKeyField();
+ String hashValueField = redisParameters.getHashValueField();
+ if (StringUtils.isEmpty(hashKeyField)) {
+ return null;
+ }
+ String hashKey;
+ if (fields.contains(hashKeyField)) {
+ hashKey =
element.getField(fields.indexOf(hashKeyField)).toString();
+ } else {
+ hashKey = hashKeyField;
+ }
+ String hashValue;
+ if (StringUtils.isEmpty(hashValueField)) {
+ hashValue = new String(serializationSchema.serialize(element));
+ } else {
+ if (fields.contains(hashValueField)) {
+ hashValue =
element.getField(fields.indexOf(hashValueField)).toString();
+ } else {
+ hashValue = hashValueField;
+ }
+ }
+ Map<String, String> kvMap = new HashMap<>();
+ kvMap.put(hashKey, hashValue);
+ return JsonUtils.toJsonString(kvMap);
+ }
+
+ private String handleOtherTypes(SeaTunnelRow element, List<String> fields)
{
+ String valueField = redisParameters.getValueField();
+ if (StringUtils.isEmpty(valueField)) {
+ return null;
+ }
+ if (fields.contains(valueField)) {
+ return element.getField(fields.indexOf(valueField)).toString();
+ }
+ return valueField;
+ }
+
private void clearBuffer() {
keyBuffer.clear();
valueBuffer.clear();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 60bffba6f4..0f67575ea4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -357,5 +357,90 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.select(0);
}
+ @TestTemplate
+ public void testCustomKeyWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/redis-to-redis-custom-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int count = 0;
+ for (int i = 0; i < 100; i++) {
+ String data = jedis.get("custom-key-check:" + i);
+ if (data != null) {
+ count++;
+ }
+ }
+ Assertions.assertEquals(100, count);
+ for (int i = 0; i < 100; i++) {
+ jedis.del("custom-key-check:" + i);
+ }
+ }
+
+ @TestTemplate
+ public void testCustomValueForStringWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/redis-to-redis-custom-value-for-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ int count = 0;
+ for (int i = 0; i < 100; i++) {
+ String data = jedis.get("custom-value-check:" + i);
+ if (data != null) {
+ Assertions.assertEquals("string", data);
+ count++;
+ }
+ }
+ Assertions.assertEquals(100, count);
+ for (int i = 0; i < 100; i++) {
+ jedis.del("custom-value-check:" + i);
+ }
+ }
+
+ @TestTemplate
+ public void testCustomValueForListWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/redis-to-redis-custom-value-for-list.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("custom-value-check-list", 0, -1);
+ Assertions.assertEquals(100, list.size());
+ jedis.del("custom-value-check-list");
+ }
+
+ @TestTemplate
+ public void testCustomValueForSetWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/redis-to-redis-custom-value-for-set.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ long amount = jedis.scard("custom-value-check-set");
+ Assertions.assertEquals(100, amount);
+ jedis.del("custom-value-check-set");
+ }
+
+ @TestTemplate
+ public void testCustomValueForZSetWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/redis-to-redis-custom-value-for-zset.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ long amount = jedis.zcard("custom-value-check-zset");
+ Assertions.assertEquals(100, amount);
+ jedis.del("custom-value-check-zset");
+ }
+
+ @TestTemplate
+ public void testCustomHashKeyAndValueWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+
container.executeJob("/redis-to-redis-custom-hash-key-and-value.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ long amount = jedis.hlen("custom-hash-check");
+ Assertions.assertEquals(100, amount);
+ for (int i = 0; i < 100; i++) {
+ Assertions.assertEquals("string", jedis.hget("custom-hash-check",
String.valueOf(i)));
+ }
+ jedis.del("custom-hash-check");
+ }
+
public abstract RedisContainerInfo getRedisContainerInfo();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
new file mode 100644
index 0000000000..ecdbbaccea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-hash-check"
+ hash_key_field = "id"
+ hash_value_field = "c_string"
+ data_type = hash
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
new file mode 100644
index 0000000000..aa1171546f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-key-check:{id}"
+ support_custom_key = true
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
new file mode 100644
index 0000000000..05dfa51040
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-value-check:{id}"
+ support_custom_key = true
+ value_field = "c_string"
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
new file mode 100644
index 0000000000..260d7b010b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-value-check-list"
+ value_field = "c_string"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
new file mode 100644
index 0000000000..28acbd70ca
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-value-check-set"
+ value_field = "id"
+ data_type = set
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
new file mode 100644
index 0000000000..b862127c91
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "custom-value-check-zset"
+ value_field = "id"
+ data_type = zset
+ batch_size = 33
+ }
+}
\ No newline at end of file