This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ef2c3c7283 [Improve][Connector-V2] Redis support custom key and value 
(#7888)
ef2c3c7283 is described below

commit ef2c3c7283e7c1ff7ce445b71a62977ac5ed0403
Author: limin <[email protected]>
AuthorDate: Tue Oct 29 21:59:36 2024 +0800

    [Improve][Connector-V2] Redis support custom key and value (#7888)
    
    Co-authored-by: limin <[email protected]>
---
 docs/en/connector-v2/sink/Redis.md                 | 134 +++++++++++++++++---
 docs/zh/connector-v2/sink/Redis.md                 | 137 ++++++++++++++++++---
 .../seatunnel/redis/config/RedisConfig.java        |  26 ++++
 .../seatunnel/redis/config/RedisParameters.java    |  20 +++
 .../seatunnel/redis/sink/RedisSinkFactory.java     |   4 +
 .../seatunnel/redis/sink/RedisSinkWriter.java      | 113 +++++++++++++++--
 .../connector/redis/RedisTestCaseTemplateIT.java   |  85 +++++++++++++
 .../redis-to-redis-custom-hash-key-and-value.conf  | 119 ++++++++++++++++++
 .../test/resources/redis-to-redis-custom-key.conf  | 118 ++++++++++++++++++
 .../redis-to-redis-custom-value-for-key.conf       | 119 ++++++++++++++++++
 .../redis-to-redis-custom-value-for-list.conf      | 118 ++++++++++++++++++
 .../redis-to-redis-custom-value-for-set.conf       | 118 ++++++++++++++++++
 .../redis-to-redis-custom-value-for-zset.conf      | 118 ++++++++++++++++++
 13 files changed, 1182 insertions(+), 47 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index b5f444bb11..5b37720891 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -12,21 +12,25 @@ Used to write data to Redis.
 
 ## Options
 
-|      name      |  type  |       required        | default value |
-|----------------|--------|-----------------------|---------------|
-| host           | string | yes                   | -             |
-| port           | int    | yes                   | -             |
-| key            | string | yes                   | -             |
-| data_type      | string | yes                   | -             |
-| batch_size     | int    | no                    | 10            |
-| user           | string | no                    | -             |
-| auth           | string | no                    | -             |
-| db_num         | int    | no                    | 0             |
-| mode           | string | no                    | single        |
-| nodes          | list   | yes when mode=cluster | -             |
-| format         | string | no                    | json          |
-| expire         | long   | no                    | -1            |
-| common-options |        | no                    | -             |
+| name               | type    |       required        | default value |
+|--------------------|---------|-----------------------|---------------|
+| host               | string  | yes                   | -             |
+| port               | int     | yes                   | -             |
+| key                | string  | yes                   | -             |
+| data_type          | string  | yes                   | -             |
+| batch_size         | int     | no                    | 10            |
+| user               | string  | no                    | -             |
+| auth               | string  | no                    | -             |
+| db_num             | int     | no                    | 0             |
+| mode               | string  | no                    | single        |
+| nodes              | list    | yes when mode=cluster | -             |
+| format             | string  | no                    | json          |
+| expire             | long    | no                    | -1            |
+| support_custom_key | boolean | no                    | false         |
+| value_field        | string  | no                    | -             |
+| hash_key_field     | string  | no                    | -             |
+| hash_value_field   | string  | no                    | -             |
+| common-options     |         | no                    | -             |
 
 ### host [string]
 
@@ -50,12 +54,12 @@ Upstream data is the following:
 | 500  | internal error | false   |
 
 If you assign field name to `code` and data_type to `key`, two data will be 
written to redis:
-1. `200 -> {code: 200, message: true, data: get success}`
-2. `500 -> {code: 500, message: false, data: internal error}`
+1. `200 -> {code: 200, data: get success, success: true}`
+2. `500 -> {code: 500, data: internal error, success: false}`
 
 If you assign field name to `value` and data_type to `key`, only one data will 
be written to redis because `value` is not existed in upstream data's fields:
 
-1. `value -> {code: 500, message: false, data: internal error}`
+1. `value -> {code: 500, data: internal error, success: false}`
 
 Please see the data_type section for specific writing rules.
 
@@ -85,7 +89,7 @@ Redis data types, support `key` `hash` `list` `set` `zset`
 
 > Each data from upstream will be added to the configured zset key with a 
 > weight of 1. So the order of data in zset is based on the order of data 
 > consumption.
 >
-  ### batch_size [int]
+### batch_size [int]
 
 ensure the batch write size in single-machine mode; no guarantees in cluster 
mode.
 
@@ -135,6 +139,61 @@ Connector will generate data as the following and write it 
to redis:
 
 Set redis expiration time, the unit is second. The default value is -1, keys 
do not automatically expire by default.
 
+### support_custom_key [boolean]
+
+if true, the key can be customized by the field value in the upstream data.
+
+Upstream data is the following:
+
+| code |      data      | success |
+|------|----------------|---------|
+| 200  | get success    | true    |
+| 500  | internal error | false   |
+
+You can customize the Redis key using '{' and '}', and the field name in '{}' 
will be parsed and replaced by the field value in the upstream data. For 
example, If you assign field name to `{code}` and data_type to `key`, two data 
will be written to redis:
+1. `200 -> {code: 200, data: get success, success: true}`
+2. `500 -> {code: 500, data: internal error, success: false}`
+
+Redis key can be composed of fixed and variable parts, connected by ':'. For 
example, If you assign field name to `code:{code}` and data_type to `key`, two 
data will be written to redis:
+1. `code:200 -> {code: 200, data: get success, success: true}`
+2. `code:500 -> {code: 500, data: internal error, success: false}`
+
+### value_field [string]
+
+The field of value you want to write to redis, `data_type` support `key` 
`list` `set` `zset`.
+
+When you assign field name to `value` and value_field is `data` and data_type 
to `key`, for example:
+
+Upstream data is the following:
+
+| code |    data     | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+The following data will be written to redis:
+1. `value -> get success`
+
+### hash_key_field [string]
+
+The field of hash key you want to write to redis, `data_type` support `hash`
+
+### hash_value_field [string]
+
+The field of hash value you want to write to redis, `data_type` support `hash`
+
+When you assign field name to `value` and hash_key_field is `data` and 
hash_value_field is `success` and data_type to `hash`, for example:
+
+Upstream data is the following:
+
+| code |    data     | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+Connector will generate data as the following and write it to redis:
+
+The following data will be written to redis:
+1. `value -> get success | true`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details
@@ -152,6 +211,43 @@ Redis {
 }
 ```
 
+custom key:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = "name:{name}"
+  support_custom_key = true
+  data_type = key
+}
+```
+
+custom value:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = person
+  value_field = "name"
+  data_type = key
+}
+```
+
+custom HashKey and HashValue:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = person
+  hash_key_field = "name"
+  hash_value_field = "age"
+  data_type = hash
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git a/docs/zh/connector-v2/sink/Redis.md 
b/docs/zh/connector-v2/sink/Redis.md
index b47d9de914..5640710de4 100644
--- a/docs/zh/connector-v2/sink/Redis.md
+++ b/docs/zh/connector-v2/sink/Redis.md
@@ -12,20 +12,25 @@
 
 ## 选项
 
-|       名称       |   类型   |        是否必须         |  默认值   |
-|----------------|--------|---------------------|--------|
-| host           | string | 是                   | -      |
-| port           | int    | 是                   | -      |
-| key            | string | 是                   | -      |
-| data_type      | string | 是                   | -      |
-| user           | string | 否                   | -      |
-| auth           | string | 否                   | -      |
-| db_num         | int    | 否                   | 0      |
-| mode           | string | 否                   | single |
-| nodes          | list   | 当 mode=cluster 时为:是 | -      |
-| format         | string | 否                   | json   |
-| expire         | long   | 否                   | -1     |
-| common-options |        | 否                   | -      |
+| name               | type    |       required        | default value |
+|--------------------|---------|-----------------------|---------------|
+| host               | string  | yes                   | -             |
+| port               | int     | yes                   | -             |
+| key                | string  | yes                   | -             |
+| data_type          | string  | yes                   | -             |
+| batch_size         | int     | no                    | 10            |
+| user               | string  | no                    | -             |
+| auth               | string  | no                    | -             |
+| db_num             | int     | no                    | 0             |
+| mode               | string  | no                    | single        |
+| nodes              | list    | yes when mode=cluster | -             |
+| format             | string  | no                    | json          |
+| expire             | long    | no                    | -1            |
+| support_custom_key | boolean | no                    | false         |
+| value_field        | string  | no                    | -             |
+| hash_key_field     | string  | no                    | -             |
+| hash_value_field   | string  | no                    | -             |
+| common-options     |         | no                    | -             |
 
 ### host [string]
 
@@ -48,13 +53,17 @@ Redis 端口
 | 200  | 获取成功 | true    |
 | 500  | 内部错误 | false   |
 
-如果将字段名称指定为 `code` 并将 data_type 设置为 `key`,将有两个数据写入 Redis:
-1. `200 -> {code: 200, message: true, data: 获取成功}`
-2. `500 -> {code: 500, message: false, data: 内部错误}`
+可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将 
data_type 设置为 `key`,将有两个数据写入 Redis:
+1. `200 -> {code: 200, data: 获取成功, success: true}`
+2. `500 -> {code: 500, data: 内部错误, success: false}`
 
-如果将字段名称指定为 `value` 并将 data_type 设置为 `key`,则由于上游数据的字段中没有 `value` 字段,将只有一个数据写入 
Redis:
+Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type 
设置为 `key`,将有两个数据写入 Redis:
+1. `code:200 -> {code: 200, data: 获取成功, success: true}`
+2. `code:500 -> {code: 500, data: 内部错误, success: false}`
 
-1. `value -> {code: 500, message: false, data: 内部错误}`
+如果将Redis键名指定为 `value` 并将 data_type 设置为 `key`,则只有一个数据写入 Redis:
+
+1. `value -> {code: 500, data: 内部错误, success: false}`
 
 请参见 data_type 部分以了解具体的写入规则。
 
@@ -128,6 +137,59 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
 
 设置 Redis 的过期时间,单位为秒。默认值为 -1,表示键不会自动过期。
 
+### support_custom_key [boolean]
+
+设置为true,表示启用自定义Key。
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200  | 获取成功 | true    |
+| 500  | 内部错误 | false   |
+
+可以使用`{`和`}`符号自定义Redis键名,`{}`中的字段名会被解析替换为上游数据中的某个字段值,例如:将字段名称指定为 `{code}` 并将 
data_type 设置为 `key`,将有两个数据写入 Redis:
+1. `200 -> {code: 200, data: 获取成功, success: true}`
+2. `500 -> {code: 500, data: 内部错误, success: false}`
+
+Redis键名可以由固定部分和变化部分组成,通过Redis分组符号:连接,例如:将字段名称指定为 `code:{code}` 并将 data_type 
设置为 `key`,将有两个数据写入 Redis:
+1. `code:200 -> {code: 200, data: 获取成功, success: true}`
+2. `code:500 -> {code: 500, data: 内部错误, success: false}`
+
+### value_field [string]
+
+要写入Redis的值的字段, `data_type` 支持 `key` `list` `set` `zset`.
+
+当你指定Redis键名字段`key`指定为 `value`,值字段`value_field`指定为`data`,并将`data_type`指定为`key`时,
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200  | 获取成功 | true    |
+
+如下的数据会被写入Redis:
+1. `value -> 获取成功`
+
+### hash_key_field [string]
+
+要写入Redis的hash键字段, `data_type` 支持 `hash`
+
+### hash_value_field [string]
+
+要写入Redis的hash值字段, `data_type` 支持 `hash`
+
+当你指定Redis键名字段`key`指定为 
`value`,hash键字段`hash_key_field`指定为`data`,hash值字段`hash_value_field`指定为`success`,并将`data_type`指定为`hash`时,
+
+上游数据如下:
+
+| code | data | success |
+|------|------|---------|
+| 200  | 获取成功 | true    |
+
+如下的数据会被写入Redis:
+1. `value -> 获取成功 | true`
+
 ### common options
 
 Sink 插件通用参数,请参考 [Sink Common Options](../sink-common-options.md) 获取详情
@@ -145,6 +207,43 @@ Redis {
 }
 ```
 
+自定义Key示例:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = "name:{name}"
+  support_custom_key = true
+  data_type = key
+}
+```
+
+自定义Value示例:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = person
+  value_field = "name"
+  data_type = key
+}
+```
+
+自定义HashKey和HashValue示例:
+
+```hocon
+Redis {
+  host = localhost
+  port = 6379
+  key = person
+  hash_key_field = "name"
+  hash_value_field = "age"
+  data_type = hash
+}
+```
+
 ## 更新日志
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 3be5b39de9..c9809868dc 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -127,6 +127,32 @@ public class RedisConfig {
                             "batch_size is used to control the size of a batch 
of data during read and write operations"
                                     + ",default 10");
 
+    public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
+            Options.key("support_custom_key")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "if true, the key can be customized by the field 
value in the upstream data.");
+
+    public static final Option<String> VALUE_FIELD =
+            Options.key("value_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field of value you want to write to redis, 
support string list set zset");
+
+    public static final Option<String> HASH_KEY_FIELD =
+            Options.key("hash_key_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field of hash key you want to write 
to redis");
+
+    public static final Option<String> HASH_VALUE_FIELD =
+            Options.key("hash_value_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The field of hash value you want to 
write to redis");
+
     public enum Format {
         JSON,
         // TEXT will be supported later
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 3d7e954f1d..6dff3cba71 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -57,6 +57,10 @@ public class RedisParameters implements Serializable {
     private List<String> redisNodes = Collections.emptyList();
     private long expire = RedisConfig.EXPIRE.defaultValue();
     private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
+    private Boolean supportCustomKey;
+    private String valueField;
+    private String hashKeyField;
+    private String hashValueField;
 
     private int redisVersion;
 
@@ -97,6 +101,22 @@ public class RedisParameters implements Serializable {
         this.redisDataType = config.get(RedisConfig.DATA_TYPE);
         // Indicates the number of keys to attempt to return per 
iteration.default 10
         this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+        // set support custom key
+        if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) {
+            this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY);
+        }
+        // set value field
+        if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
+            this.valueField = config.get(RedisConfig.VALUE_FIELD);
+        }
+        // set hash key field
+        if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
+            this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
+        }
+        // set hash value field
+        if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
+            this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
+        }
     }
 
     public RedisClient buildRedisClient() {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index 49c2644d70..38098a2560 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -53,6 +53,10 @@ public class RedisSinkFactory implements TableSinkFactory {
                         RedisConfig.KEY_PATTERN,
                         RedisConfig.FORMAT,
                         RedisConfig.EXPIRE,
+                        RedisConfig.SUPPORT_CUSTOM_KEY,
+                        RedisConfig.VALUE_FIELD,
+                        RedisConfig.HASH_KEY_FIELD,
+                        RedisConfig.HASH_VALUE_FIELD,
                         SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
                 .build();
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index f03c5c48c8..9d5c73df2c 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
@@ -29,13 +30,20 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
+    private static final String REDIS_GROUP_DELIMITER = ":";
+    private static final String LEFT_PLACEHOLDER_MARKER = "{";
+    private static final String RIGHT_PLACEHOLDER_MARKER = "}";
     private final SeaTunnelRowType seaTunnelRowType;
     private final RedisParameters redisParameters;
     private final SerializationSchema serializationSchema;
@@ -60,23 +68,110 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        String data = new String(serializationSchema.serialize(element));
-        String keyField = redisParameters.getKeyField();
         List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
-        String key;
-        if (fields.contains(keyField)) {
-            key = element.getField(fields.indexOf(keyField)).toString();
-        } else {
-            key = keyField;
-        }
+        String key = getKey(element, fields);
         keyBuffer.add(key);
-        valueBuffer.add(data);
+        String value = getValue(element, fields);
+        valueBuffer.add(value);
         if (keyBuffer.size() >= batchSize) {
             doBatchWrite();
             clearBuffer();
         }
     }
 
+    private String getKey(SeaTunnelRow element, List<String> fields) {
+        String key = redisParameters.getKeyField();
+        Boolean supportCustomKey = redisParameters.getSupportCustomKey();
+        if (Boolean.TRUE.equals(supportCustomKey)) {
+            return getCustomKey(element, fields, key);
+        }
+        return getNormalKey(element, fields, key);
+    }
+
+    private static String getNormalKey(SeaTunnelRow element, List<String> 
fields, String keyField) {
+        if (fields.contains(keyField)) {
+            return element.getField(fields.indexOf(keyField)).toString();
+        } else {
+            return keyField;
+        }
+    }
+
+    private String getCustomKey(SeaTunnelRow element, List<String> fields, 
String keyField) {
+        String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
+        StringBuilder key = new StringBuilder();
+        for (int i = 0; i < keyFieldSegments.length; i++) {
+            String keyFieldSegment = keyFieldSegments[i];
+            if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
+                    && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
+                String realKeyField = keyFieldSegment.substring(1, 
keyFieldSegment.length() - 1);
+                if (fields.contains(realKeyField)) {
+                    
key.append(element.getField(fields.indexOf(realKeyField)).toString());
+                } else {
+                    key.append(keyFieldSegment);
+                }
+            } else {
+                key.append(keyFieldSegment);
+            }
+            if (i != keyFieldSegments.length - 1) {
+                key.append(REDIS_GROUP_DELIMITER);
+            }
+        }
+        return key.toString();
+    }
+
+    private String getValue(SeaTunnelRow element, List<String> fields) {
+        String value;
+        RedisDataType redisDataType = redisParameters.getRedisDataType();
+        if (RedisDataType.HASH.equals(redisDataType)) {
+            value = handleHashType(element, fields);
+        } else {
+            value = handleOtherTypes(element, fields);
+        }
+        if (value == null) {
+            byte[] serialize = serializationSchema.serialize(element);
+            value = new String(serialize);
+        }
+        return value;
+    }
+
+    private String handleHashType(SeaTunnelRow element, List<String> fields) {
+        String hashKeyField = redisParameters.getHashKeyField();
+        String hashValueField = redisParameters.getHashValueField();
+        if (StringUtils.isEmpty(hashKeyField)) {
+            return null;
+        }
+        String hashKey;
+        if (fields.contains(hashKeyField)) {
+            hashKey = 
element.getField(fields.indexOf(hashKeyField)).toString();
+        } else {
+            hashKey = hashKeyField;
+        }
+        String hashValue;
+        if (StringUtils.isEmpty(hashValueField)) {
+            hashValue = new String(serializationSchema.serialize(element));
+        } else {
+            if (fields.contains(hashValueField)) {
+                hashValue = 
element.getField(fields.indexOf(hashValueField)).toString();
+            } else {
+                hashValue = hashValueField;
+            }
+        }
+        Map<String, String> kvMap = new HashMap<>();
+        kvMap.put(hashKey, hashValue);
+        return JsonUtils.toJsonString(kvMap);
+    }
+
+    private String handleOtherTypes(SeaTunnelRow element, List<String> fields) 
{
+        String valueField = redisParameters.getValueField();
+        if (StringUtils.isEmpty(valueField)) {
+            return null;
+        }
+        if (fields.contains(valueField)) {
+            return element.getField(fields.indexOf(valueField)).toString();
+        }
+        return valueField;
+    }
+
     private void clearBuffer() {
         keyBuffer.clear();
         valueBuffer.clear();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 60bffba6f4..0f67575ea4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -357,5 +357,90 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.select(0);
     }
 
+    @TestTemplate
+    public void testCustomKeyWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/redis-to-redis-custom-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int count = 0;
+        for (int i = 0; i < 100; i++) {
+            String data = jedis.get("custom-key-check:" + i);
+            if (data != null) {
+                count++;
+            }
+        }
+        Assertions.assertEquals(100, count);
+        for (int i = 0; i < 100; i++) {
+            jedis.del("custom-key-check:" + i);
+        }
+    }
+
+    @TestTemplate
+    public void testCustomValueForStringWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/redis-to-redis-custom-value-for-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        int count = 0;
+        for (int i = 0; i < 100; i++) {
+            String data = jedis.get("custom-value-check:" + i);
+            if (data != null) {
+                Assertions.assertEquals("string", data);
+                count++;
+            }
+        }
+        Assertions.assertEquals(100, count);
+        for (int i = 0; i < 100; i++) {
+            jedis.del("custom-value-check:" + i);
+        }
+    }
+
+    @TestTemplate
+    public void testCustomValueForListWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/redis-to-redis-custom-value-for-list.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("custom-value-check-list", 0, -1);
+        Assertions.assertEquals(100, list.size());
+        jedis.del("custom-value-check-list");
+    }
+
+    @TestTemplate
+    public void testCustomValueForSetWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/redis-to-redis-custom-value-for-set.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        long amount = jedis.scard("custom-value-check-set");
+        Assertions.assertEquals(100, amount);
+        jedis.del("custom-value-check-set");
+    }
+
+    @TestTemplate
+    public void testCustomValueForZSetWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/redis-to-redis-custom-value-for-zset.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        long amount = jedis.zcard("custom-value-check-zset");
+        Assertions.assertEquals(100, amount);
+        jedis.del("custom-value-check-zset");
+    }
+
+    @TestTemplate
+    public void testCustomHashKeyAndValueWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/redis-to-redis-custom-hash-key-and-value.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        long amount = jedis.hlen("custom-hash-check");
+        Assertions.assertEquals(100, amount);
+        for (int i = 0; i < 100; i++) {
+            Assertions.assertEquals("string", jedis.hget("custom-hash-check", 
String.valueOf(i)));
+        }
+        jedis.del("custom-hash-check");
+    }
+
     public abstract RedisContainerInfo getRedisContainerInfo();
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
new file mode 100644
index 0000000000..ecdbbaccea
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-hash-key-and-value.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-hash-check"
+    hash_key_field = "id"
+    hash_value_field = "c_string"
+    data_type = hash
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
new file mode 100644
index 0000000000..aa1171546f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-key.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-key-check:{id}"
+    support_custom_key = true
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
new file mode 100644
index 0000000000..05dfa51040
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-key.conf
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-value-check:{id}"
+    support_custom_key = true
+    value_field = "c_string"
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
new file mode 100644
index 0000000000..260d7b010b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-list.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-value-check-list"
+    value_field = "c_string"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
new file mode 100644
index 0000000000..28acbd70ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-set.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-value-check-set"
+    value_field = "id"
+    data_type = set
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
new file mode 100644
index 0000000000..b862127c91
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-custom-value-for-zset.conf
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    format = "json"
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "custom-value-check-zset"
+    value_field = "id"
+    data_type = zset
+    batch_size = 33
+  }
+}
\ No newline at end of file

Reply via email to