This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6e8b7c5da5 [Feature][Redis] Add redis key into the result record 
(#9574)
6e8b7c5da5 is described below

commit 6e8b7c5da563bf3decb44adbc977e015be5ae7c1
Author: dy102 <[email protected]>
AuthorDate: Fri Jul 25 22:27:36 2025 +0900

    [Feature][Redis] Add redis key into the result record (#9574)
---
 .github/workflows/backend.yml                      |   2 +-
 docs/en/connector-v2/source/Redis.md               |  86 +++++++++++---
 .../seatunnel/redis/config/RedisParameters.java    |  11 ++
 .../seatunnel/redis/config/RedisSourceOptions.java |  21 ++++
 .../seatunnel/redis/source/KeyedRecordReader.java  | 102 +++++++++++++++++
 .../seatunnel/redis/source/RedisRecordReader.java  |  78 +++++++++++++
 .../seatunnel/redis/source/RedisSourceFactory.java |  10 +-
 .../seatunnel/redis/source/RedisSourceReader.java  |  98 ++++------------
 .../redis/source/UnKeyedRecordReader.java          |  91 +++++++++++++++
 .../seatunnel/redis/util/JsonKeyValueMerger.java   |  85 ++++++++++++++
 .../KeyValueMerger.java}                           |  19 +---
 .../redis/util/KeyValueMergerFactory.java          |  40 +++++++
 .../connector/redis/RedisTestCaseTemplateIT.java   | 115 +++++++++++++++++++
 .../scan-list-to-redis-list-with-key.conf          |  74 ++++++++++++
 .../resources/scan-redis-to-redis-with-key.conf    | 125 +++++++++++++++++++++
 .../scan-set-to-redis-list-set-with-key.conf       |  74 ++++++++++++
 .../resources/scan-string-to-redis-with-key.conf   |  74 ++++++++++++
 .../scan-zset-to-redis-list-zset-with-key.conf     |  74 ++++++++++++
 18 files changed, 1069 insertions(+), 110 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 2672e0bf7d..d5e54feb82 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1419,7 +1419,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 120
+    timeout-minutes: 180
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index 3833f35e25..3ea83cac43 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -19,22 +19,25 @@ Used to read data from Redis.
 
 ## Options
 
-| name                | type   | required              | default value |
-| ------------------- | ------ |-----------------------| ------------- |
-| host                | string | yes when mode=single  | -             |
-| port                | int    | no                    | 6379          |
-| keys                | string | yes                   | -             |
-| batch_size          | int    | yes                   | 10            |
-| data_type           | string | yes                   | -             |
-| user                | string | no                    | -             |
-| auth                | string | no                    | -             |
-| db_num              | int    | no                    | 0             |
-| mode                | string | no                    | single        |
-| hash_key_parse_mode | string | no                    | all           |
-| nodes               | list   | yes when mode=cluster | -             |
-| schema              | config | yes when format=json  | -             |
-| format              | string | no                    | json          |
-| common-options      |        | no                    | -             |
+| name                | type   | required                       | default 
value |
+|---------------------| ------ |--------------------------------| 
------------- |
+| host                | string | yes when mode=single           | -            
 |
+| port                | int    | no                             | 6379         
 |
+| keys                | string | yes                            | -            
 |
+| read_key_enabled    | boolean| no                             | false        
 |
+| key_field_name      | string | yes when read_key_enabled=true | key          
 |
+| batch_size          | int    | yes                            | 10           
 |
+| data_type           | string | yes                            | -            
 |
+| user                | string | no                             | -            
 |
+| auth                | string | no                             | -            
 |
+| db_num              | int    | no                             | 0            
 |
+| mode                | string | no                             | single       
 |
+| hash_key_parse_mode | string | no                             | all          
 |
+| nodes               | list   | yes when mode=cluster          | -            
 |
+| schema              | config | yes when format=json           | -            
 |
+| format              | string | no                             | json         
 |
+| single_field_name   | string | yes when read_key_enabled=true | -            
 |
+| common-options      |        | no                             | -            
 |
 
 ### host [string]
 
@@ -114,6 +117,37 @@ each kv that in hash key it will be treated as a row and 
send it to upstream.
 
 keys pattern
 
+### read_key_enabled [boolean]
+
+This option determines whether the Redis source connector includes the Redis 
key in each output record when reading data.
+
+When set to `true`, both the key and its associated value are included in the 
record.
+
+By default (`false`), only the value is read and included.
+
+If you are using a single-value Redis data type (such as `string`, `int`, 
etc.) with `read_key_enabled = true`, 
+you must also specify `single_field_name` to map the value to a schema column, 
and `key_field_name` to map the Redis key.
+
+Note: When `read_key_enabled = true`, the schema configuration must explicitly 
include the key field to correctly map the deserialized data.
+
+Example :
+```hocon
+schema {
+  fields {
+      key = string
+      value = string
+  }
+}
+```
+
+### key_field_name [string]
+
+Specifies the field name to store the Redis key in the output record  when 
`read_key_enabled = true`.
+
+If not set, the default field name `key` will be used.
+
+This field is useful when the default `key` field name conflicts with existing 
schema fields, or if a more descriptive name is preferred.
+
 ### batch_size [int]
 
 indicates the number of keys to attempt to return per iteration,default 10
@@ -224,6 +258,26 @@ connector will generate data as the following:
 
 the schema fields of redis data
 
+### single_field_name [string]
+
+Specifies the field name for Redis values when `read_key_enabled = true` and 
the value is a single primitive (e.g., `string`, `int`).
+
+This name is used in the schema to map the value field.
+
+**Note:** This option has no effect when reading complex Redis data types such 
as hashes or objects that can be directly mapped to a schema.
+
+Example :
+```hocon
+read_key_enabled = true
+single_field_name = value
+schema {
+  fields {
+    key = string
+    value = string
+  }
+}
+```
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index c9e4337d79..3d64fa0e9c 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -55,6 +55,9 @@ public class RedisParameters implements Serializable {
     private RedisDataType redisDataType;
     private RedisBaseOptions.RedisMode mode;
     private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
+    private Boolean readKeyEnabled;
+    private String singleFieldName;
+    private String keyFieldName;
     private List<String> redisNodes = Collections.emptyList();
     private long expire = RedisSinkOptions.EXPIRE.defaultValue();
     private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
@@ -74,6 +77,14 @@ public class RedisParameters implements Serializable {
         this.dbNum = config.get(RedisBaseOptions.DB_NUM);
         // set hash key mode
         this.hashKeyParseMode = 
config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
+        // set read with key
+        this.readKeyEnabled = config.get(RedisSourceOptions.READ_KEY_ENABLED);
+        // set single field name
+        if 
(config.getOptional(RedisSourceOptions.SINGLE_FIELD_NAME).isPresent()) {
+            this.singleFieldName = 
config.get(RedisSourceOptions.SINGLE_FIELD_NAME);
+        }
+        // set key name
+        this.keyFieldName = config.get(RedisSourceOptions.KEY_FIELD_NAME);
         // set expire
         this.expire = config.get(RedisSinkOptions.EXPIRE);
         // set auth
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
index a02e113cea..673ec64d1d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
@@ -32,4 +32,25 @@ public class RedisSourceOptions extends RedisBaseOptions {
                     .defaultValue(HashKeyParseMode.ALL)
                     .withDescription(
                             "hash key parse mode, support all or kv, default 
value is all");
+
+    public static final Option<Boolean> READ_KEY_ENABLED =
+            Options.key("read_key_enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If set to true, the source connector reads Redis 
values along with their keys.");
+
+    public static final Option<String> SINGLE_FIELD_NAME =
+            Options.key("single_field_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specifies the field name to be used in the output 
row when reading single-value types "
+                                    + "(e.g., string, list, zset).");
+
+    public static final Option<String> KEY_FIELD_NAME =
+            Options.key("key_field_name")
+                    .stringType()
+                    .defaultValue("key")
+                    .withDescription("The value of key you want to write to 
redis.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
new file mode 100644
index 0000000000..0fa164a301
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMerger;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class KeyedRecordReader extends RedisRecordReader {
+
+    private final KeyValueMerger keyValueMerger;
+
+    public KeyedRecordReader(
+            RedisParameters redisParameters,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            RedisClient redisClient,
+            KeyValueMerger keyValueMerger) {
+        super(redisParameters, deserializationSchema, redisClient);
+        this.keyValueMerger = keyValueMerger;
+    }
+
+    @Override
+    public void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> zSetList = redisClient.batchGetZset(keys);
+        for (int i = 0; i < zSetList.size(); i++) {
+            for (String value : zSetList.get(i)) {
+                pollValueToNext(keys.get(i), value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<Set<String>> setList = redisClient.batchGetSet(keys);
+        for (int i = 0; i < setList.size(); i++) {
+            for (String value : setList.get(i)) {
+                pollValueToNext(keys.get(i), value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollListToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> valueList = redisClient.batchGetList(keys);
+        for (int i = 0; i < valueList.size(); i++) {
+            for (String value : valueList.get(i)) {
+                pollValueToNext(keys.get(i), value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<String> values = redisClient.batchGetString(keys);
+        for (int i = 0; i < values.size(); i++) {
+            pollValueToNext(keys.get(i), values.get(i), output);
+        }
+    }
+
+    private void pollValueToNext(String key, String value, 
Collector<SeaTunnelRow> output)
+            throws IOException {
+        if (deserializationSchema == null) {
+            throw CommonError.illegalArgument(
+                    "deserializationSchema is null",
+                    "Redis source requires a deserialization schema to parse 
the record with key: "
+                            + key);
+        } else {
+            String parsed = keyValueMerger.parseWithKey(key, value);
+            deserializationSchema.deserialize(parsed.getBytes(), output);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
new file mode 100644
index 0000000000..398f381787
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public abstract class RedisRecordReader {
+    protected final RedisParameters redisParameters;
+    protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    protected RedisClient redisClient;
+
+    protected RedisRecordReader(
+            RedisParameters redisParameters,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            RedisClient redisClient) {
+        this.redisParameters = redisParameters;
+        this.deserializationSchema = deserializationSchema;
+        this.redisClient = redisClient;
+    }
+
+    public void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<Map<String, String>> values = redisClient.batchGetHash(keys);
+        if (deserializationSchema == null) {
+            for (Map<String, String> value : values) {
+                output.collect(new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(value)}));
+            }
+            return;
+        }
+        for (Map<String, String> recordsMap : values) {
+            if (redisParameters.getHashKeyParseMode() == 
RedisSourceOptions.HashKeyParseMode.KV) {
+                deserializationSchema.deserialize(
+                        JsonUtils.toJsonString(recordsMap).getBytes(), output);
+            } else {
+                SeaTunnelRow seaTunnelRow =
+                        new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(recordsMap)});
+                output.collect(seaTunnelRow);
+            }
+        }
+    }
+
+    public abstract void pollZsetToNext(List<String> keys, 
Collector<SeaTunnelRow> output)
+            throws IOException;
+
+    public abstract void pollSetToNext(List<String> keys, 
Collector<SeaTunnelRow> output)
+            throws IOException;
+
+    public abstract void pollListToNext(List<String> keys, 
Collector<SeaTunnelRow> output)
+            throws IOException;
+
+    public abstract void pollStringToNext(List<String> keys, 
Collector<SeaTunnelRow> output)
+            throws IOException;
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index fd8dae08c4..47b023a43d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -54,7 +54,10 @@ public class RedisSourceFactory implements 
TableSourceFactory {
                         RedisSourceOptions.HASH_KEY_PARSE_MODE,
                         RedisBaseOptions.AUTH,
                         RedisBaseOptions.USER,
-                        RedisBaseOptions.KEY)
+                        RedisBaseOptions.KEY,
+                        RedisSourceOptions.READ_KEY_ENABLED,
+                        RedisSourceOptions.SINGLE_FIELD_NAME,
+                        RedisSourceOptions.KEY_FIELD_NAME)
                 .conditional(
                         RedisBaseOptions.MODE,
                         RedisBaseOptions.RedisMode.CLUSTER,
@@ -64,6 +67,11 @@ public class RedisSourceFactory implements 
TableSourceFactory {
                         RedisBaseOptions.RedisMode.SINGLE,
                         RedisBaseOptions.HOST,
                         RedisBaseOptions.PORT)
+                .conditional(
+                        RedisSourceOptions.READ_KEY_ENABLED,
+                        true,
+                        RedisSourceOptions.SINGLE_FIELD_NAME,
+                        RedisSourceOptions.KEY_FIELD_NAME)
                 .bundled(RedisBaseOptions.FORMAT, 
SinkConnectorCommonOptions.SCHEMA)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index bdb887c097..ea8293ea5a 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -21,26 +21,25 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
-import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMergerFactory;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import lombok.extern.slf4j.Slf4j;
 import redis.clients.jedis.params.ScanParams;
 import redis.clients.jedis.resps.ScanResult;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
+@Slf4j
 public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> 
{
     private final RedisParameters redisParameters;
     private final SingleSplitReaderContext context;
@@ -91,27 +90,41 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
 
     private void pollNext(List<String> keys, RedisDataType dataType, 
Collector<SeaTunnelRow> output)
             throws IOException {
+        RedisRecordReader redisRecordReader;
+        if (Boolean.TRUE.equals(redisParameters.getReadKeyEnabled())) {
+            redisRecordReader =
+                    new KeyedRecordReader(
+                            redisParameters,
+                            deserializationSchema,
+                            redisClient,
+                            KeyValueMergerFactory.createMerger(
+                                    deserializationSchema, redisParameters));
+        } else {
+            redisRecordReader =
+                    new UnKeyedRecordReader(redisParameters, 
deserializationSchema, redisClient);
+        }
+
         if (CollectionUtils.isEmpty(keys)) {
             return;
         }
         if (RedisDataType.HASH.equals(dataType)) {
-            pollHashMapToNext(keys, output);
+            redisRecordReader.pollHashMapToNext(keys, output);
             return;
         }
         if (RedisDataType.STRING.equals(dataType) || 
RedisDataType.KEY.equals(dataType)) {
-            pollStringToNext(keys, output);
+            redisRecordReader.pollStringToNext(keys, output);
             return;
         }
         if (RedisDataType.LIST.equals(dataType)) {
-            pollListToNext(keys, output);
+            redisRecordReader.pollListToNext(keys, output);
             return;
         }
         if (RedisDataType.SET.equals(dataType)) {
-            pollSetToNext(keys, output);
+            redisRecordReader.pollSetToNext(keys, output);
             return;
         }
         if (RedisDataType.ZSET.equals(dataType)) {
-            pollZsetToNext(keys, output);
+            redisRecordReader.pollZsetToNext(keys, output);
             return;
         }
         throw new RedisConnectorException(
@@ -119,73 +132,6 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
                 "UnSupport redisDataType,only support 
string,list,hash,set,zset");
     }
 
-    private void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
-            throws IOException {
-        List<List<String>> zSetList = redisClient.batchGetZset(keys);
-        for (List<String> values : zSetList) {
-            for (String value : values) {
-                pollValueToNext(value, output);
-            }
-        }
-    }
-
-    private void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
-            throws IOException {
-        List<Set<String>> setList = redisClient.batchGetSet(keys);
-        for (Set<String> values : setList) {
-            for (String value : values) {
-                pollValueToNext(value, output);
-            }
-        }
-    }
-
-    private void pollListToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
-            throws IOException {
-        List<List<String>> valueList = redisClient.batchGetList(keys);
-        for (List<String> values : valueList) {
-            for (String value : values) {
-                pollValueToNext(value, output);
-            }
-        }
-    }
-
-    private void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
-            throws IOException {
-        List<String> values = redisClient.batchGetString(keys);
-        for (String value : values) {
-            pollValueToNext(value, output);
-        }
-    }
-
-    private void pollValueToNext(String value, Collector<SeaTunnelRow> output) 
throws IOException {
-        if (deserializationSchema == null) {
-            output.collect(new SeaTunnelRow(new Object[] {value}));
-        } else {
-            deserializationSchema.deserialize(value.getBytes(), output);
-        }
-    }
-
-    private void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
-            throws IOException {
-        List<Map<String, String>> values = redisClient.batchGetHash(keys);
-        if (deserializationSchema == null) {
-            for (Map<String, String> value : values) {
-                output.collect(new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(value)}));
-            }
-            return;
-        }
-        for (Map<String, String> recordsMap : values) {
-            if (redisParameters.getHashKeyParseMode() == 
RedisSourceOptions.HashKeyParseMode.KV) {
-                deserializationSchema.deserialize(
-                        JsonUtils.toJsonString(recordsMap).getBytes(), output);
-            } else {
-                SeaTunnelRow seaTunnelRow =
-                        new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(recordsMap)});
-                output.collect(seaTunnelRow);
-            }
-        }
-    }
-
     private RedisDataType resolveScanType(RedisDataType dataType) {
         if (RedisDataType.KEY.equals(dataType)) {
             return RedisDataType.STRING;
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
new file mode 100644
index 0000000000..8b7c605d5c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class UnKeyedRecordReader extends RedisRecordReader {
+
+    public UnKeyedRecordReader(
+            RedisParameters redisParameters,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            RedisClient redisClient) {
+        super(redisParameters, deserializationSchema, redisClient);
+    }
+
+    @Override
+    public void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> zSetList = redisClient.batchGetZset(keys);
+        for (List<String> values : zSetList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<Set<String>> setList = redisClient.batchGetSet(keys);
+        for (Set<String> values : setList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollListToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> valueList = redisClient.batchGetList(keys);
+        for (List<String> values : valueList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    @Override
+    public void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<String> values = redisClient.batchGetString(keys);
+        for (String value : values) {
+            pollValueToNext(value, output);
+        }
+    }
+
+    private void pollValueToNext(String value, Collector<SeaTunnelRow> output) 
throws IOException {
+        if (deserializationSchema == null) {
+            output.collect(new SeaTunnelRow(new Object[] {value}));
+        } else {
+            deserializationSchema.deserialize(value.getBytes(), output);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
new file mode 100644
index 0000000000..cacd9afaec
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class JsonKeyValueMerger implements KeyValueMerger {
+    private final RedisParameters redisParameters;
+
+    public JsonKeyValueMerger(RedisParameters redisParameters) {
+        this.redisParameters = redisParameters;
+    }
+
+    @Override
+    public String parseWithKey(String key, String value) {
+        ObjectNode objectNode = getObjectNode(key, value);
+        return objectNode.toString();
+    }
+
+    private ObjectNode getObjectNode(String key, String value) {
+        JsonNode node = JsonUtils.toJsonNode(value);
+        if (node.isTextual()) {
+            String text = node.textValue();
+            if (looksLikeJson(text)) {
+                try {
+                    node = JsonUtils.parseObject(text);
+                } catch (Exception e) {
+                    log.debug(
+                            "Looks like JSON, but failed to parse JSON object 
from text value: {}",
+                            node.textValue());
+                }
+            }
+        }
+
+        ObjectNode objectNode;
+        if (node instanceof ObjectNode) {
+            objectNode = (ObjectNode) node;
+        } else {
+            objectNode = JsonUtils.createObjectNode();
+            setValueInNode(objectNode, node);
+        }
+        objectNode.put(redisParameters.getKeyFieldName(), key);
+        return objectNode;
+    }
+
+    public static boolean looksLikeJson(String text) {
+        return text != null
+                && ((text.startsWith("{") && text.endsWith("}"))
+                        || (text.startsWith("[") && text.endsWith("]")));
+    }
+
+    private void setValueInNode(ObjectNode objectNode, JsonNode node) {
+        String singleFieldName = redisParameters.getSingleFieldName();
+        if (singleFieldName != null) {
+            objectNode.set(singleFieldName, node);
+        } else {
+            throw CommonError.illegalArgument(
+                    "singleFieldName is null",
+                    "You must specify 'single_field_name' when using a single 
value with key-enabled schema.");
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
similarity index 54%
copy from 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
copy to 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
index a02e113cea..c2d6b88d43 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
@@ -15,21 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.redis.config;
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class RedisSourceOptions extends RedisBaseOptions {
-    public enum HashKeyParseMode {
-        ALL,
-        KV;
-    }
-
-    public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
-            Options.key("hash_key_parse_mode")
-                    .enumType(HashKeyParseMode.class)
-                    .defaultValue(HashKeyParseMode.ALL)
-                    .withDescription(
-                            "hash key parse mode, support all or kv, default 
value is all");
+public interface KeyValueMerger {
+    String parseWithKey(String key, String value);
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
new file mode 100644
index 0000000000..36fffcf4cb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+public class KeyValueMergerFactory {
+    private KeyValueMergerFactory() {}
+
+    public static KeyValueMerger createMerger(
+            DeserializationSchema<?> schema, RedisParameters redisParameters) {
+        if (schema == null) {
+            throw CommonError.illegalArgument(
+                    "deserializationSchema is null",
+                    "Redis source requires a deserialization schema to parse 
the record with key");
+        }
+        if (schema instanceof JsonDeserializationSchema) {
+            return new JsonKeyValueMerger(redisParameters);
+        }
+        throw CommonError.unsupportedOperation("Redis", 
schema.getClass().getTypeName());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index bdc66016db..c66a9baedf 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -360,6 +360,121 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         }
     }
 
+    @TestTemplate
+    public void testScanStringTypeWriteRedisWithKey(TestContainer container)
+            throws IOException, InterruptedException {
+        String keyPrefix = "string_test";
+        for (int i = 0; i < 1000; i++) {
+            jedis.set(keyPrefix + i, "val");
+        }
+        Container.ExecResult execResult =
+                container.executeJob("/scan-string-to-redis-with-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("string_test_list", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        for (int i = 0; i < 1000; i++) {
+            Assertions.assertTrue(list.get(i).contains("_suffix"));
+        }
+        jedis.del("string_test_list");
+        for (int i = 0; i < 1000; i++) {
+            jedis.del(keyPrefix + i);
+        }
+    }
+
+    @TestTemplate
+    public void testScanListTypeWriteRedisWithKey(TestContainer container)
+            throws IOException, InterruptedException {
+        String keyPrefix = "list-test-read";
+        for (int i = 0; i < 100; i++) {
+            String list = keyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.lpush(list, "val" + j);
+            }
+        }
+        Container.ExecResult execResult =
+                container.executeJob("/scan-list-to-redis-list-with-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("list-test-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        for (int i = 0; i < 1000; i++) {
+            Assertions.assertTrue(list.get(i).contains("_suffix"));
+        }
+        jedis.del("list-test-check");
+        for (int i = 0; i < 100; i++) {
+            String delKey = keyPrefix + i;
+            jedis.del(delKey);
+        }
+    }
+
+    @TestTemplate
+    public void testScanSetTypeWriteRedisWithKey(TestContainer container)
+            throws IOException, InterruptedException {
+        String setKeyPrefix = "key-test-set";
+        for (int i = 0; i < 100; i++) {
+            String setKey = setKeyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.sadd(setKey, j + "");
+            }
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/scan-set-to-redis-list-set-with-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("key-set-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+
+        for (int i = 0; i < 1000; i++) {
+            Assertions.assertTrue(list.get(i).contains("_suffix"));
+        }
+
+        jedis.del("key-set-check");
+        for (int i = 0; i < 100; i++) {
+            String setKey = setKeyPrefix + i;
+            jedis.del(setKey);
+        }
+    }
+
+    @TestTemplate
+    public void testScanZsetTypeWriteRedisWithKey(TestContainer container)
+            throws IOException, InterruptedException {
+        String zSetKeyPrefix = "key-test-zset";
+        for (int i = 0; i < 100; i++) {
+            String key = zSetKeyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.zadd(key, 1, j + "");
+            }
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/scan-zset-to-redis-list-zset-with-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("key-zset-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+
+        for (int i = 0; i < 1000; i++) {
+            Assertions.assertTrue(list.get(i).contains("_suffix"));
+        }
+
+        jedis.del("key-zset-check");
+        for (int i = 0; i < 100; i++) {
+            String key = zSetKeyPrefix + i;
+            jedis.del(key);
+        }
+    }
+
+    @TestTemplate
+    public void testCustomKeyWriteRedisWithKey(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/scan-redis-to-redis-with-key.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        for (int i = 0; i < 100; i++) {
+            Assertions.assertTrue(jedis.exists("redis-key-check:" + "key_test" 
+ i));
+        }
+        for (int i = 0; i < 100; i++) {
+            jedis.del("redis-key-check:" + i);
+        }
+    }
+
     @TestTemplate
     public void testMultipletableRedisSink(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
new file mode 100644
index 0000000000..a7a70870ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "list-test-read*"
+    data_type = list
+    batch_size = 33
+    read_key_enabled = true
+    key_field_name = custom_key
+    single_field_name = custom_value
+    format = json
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "custom_key"
+          type = "string"
+        },
+        {
+          name = "custom_value"
+          type = "string"
+        }
+      ]
+    }
+  }
+}
+
+transform {
+  Sql {
+    query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM 
source_table"
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "list-test-check"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
new file mode 100644
index 0000000000..2510f7d4f4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+    read_key_enabled = true
+    key_field_name = key
+    single_field_name = value
+    format = json
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "key"
+          type = "string"
+        },
+        {
+          name = "id"
+          type = "bigint"
+        },
+        {
+          name = "c_map"
+          type = "map<string, smallint>"
+        },
+        {
+          name = "c_array"
+          type = "array<tinyint>"
+        },
+        {
+          name = "c_string"
+          type = "string"
+        },
+        {
+          name = "c_boolean"
+          type = "boolean"
+        },
+        {
+          name = "c_tinyint"
+          type = "tinyint"
+        },
+        {
+          name = "c_smallint"
+          type = "smallint"
+        },
+        {
+          name = "c_int"
+          type = "int"
+        },
+        {
+          name = "c_bigint"
+          type = "bigint"
+        },
+        {
+          name = "c_float"
+          type = "float"
+        },
+        {
+          name = "c_double"
+          type = "double"
+        },
+        {
+          name = "c_decimal"
+          type = "decimal(2,1)"
+        },
+        {
+          name = "c_bytes"
+          type = "bytes"
+        },
+        {
+          name = "c_date"
+          type = "date"
+        },
+        {
+          name = "c_timestamp"
+          type = "timestamp"
+        }
+      ]
+    }
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "redis-key-check:{key}"
+    support_custom_key = true
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
new file mode 100644
index 0000000000..a68c7feb18
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key-test-set*"
+    data_type = set
+    batch_size = 33
+    read_key_enabled = true
+    key_field_name = custom_key
+    single_field_name = custom_value
+    format = json
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "custom_key"
+          type = "string"
+        },
+        {
+          name = "custom_value"
+          type = "string"
+        }
+      ]
+    }
+  }
+}
+
+transform {
+  Sql {
+    query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM 
source_table"
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key-set-check"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
new file mode 100644
index 0000000000..fc9ac1e98f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "string_test*"
+    data_type = string
+    batch_size = 33
+    read_key_enabled = true
+    key_field_name = custom_key
+    single_field_name = custom_value
+    format = json
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "custom_key"
+          type = "string"
+        },
+        {
+          name = "custom_value"
+          type = "string"
+        }
+      ]
+    }
+  }
+}
+
+transform {
+  Sql {
+    query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM 
source_table"
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "string_test_list"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
new file mode 100644
index 0000000000..e70adc2cb1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key-test-zset*"
+    data_type = zset
+    batch_size = 33
+    read_key_enabled = true
+    key_field_name = custom_key
+    single_field_name = custom_value
+    format = json
+    schema = {
+      table = "RedisDatabase.RedisTable"
+      columns = [
+        {
+          name = "custom_key"
+          type = "string"
+        },
+        {
+          name = "custom_value"
+          type = "string"
+        }
+      ]
+    }
+  }
+}
+
+transform {
+  Sql {
+    query = "SELECT custom_key, CONCAT(custom_value, '_suffix') AS value FROM 
source_table"
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key-zset-check"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file

Reply via email to