This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6e8b7c5da5 [Feature][Redis] Add redis key into the result record
(#9574)
6e8b7c5da5 is described below
commit 6e8b7c5da563bf3decb44adbc977e015be5ae7c1
Author: dy102 <[email protected]>
AuthorDate: Fri Jul 25 22:27:36 2025 +0900
[Feature][Redis] Add redis key into the result record (#9574)
---
.github/workflows/backend.yml | 2 +-
docs/en/connector-v2/source/Redis.md | 86 +++++++++++---
.../seatunnel/redis/config/RedisParameters.java | 11 ++
.../seatunnel/redis/config/RedisSourceOptions.java | 21 ++++
.../seatunnel/redis/source/KeyedRecordReader.java | 102 +++++++++++++++++
.../seatunnel/redis/source/RedisRecordReader.java | 78 +++++++++++++
.../seatunnel/redis/source/RedisSourceFactory.java | 10 +-
.../seatunnel/redis/source/RedisSourceReader.java | 98 ++++------------
.../redis/source/UnKeyedRecordReader.java | 91 +++++++++++++++
.../seatunnel/redis/util/JsonKeyValueMerger.java | 85 ++++++++++++++
.../KeyValueMerger.java} | 19 +---
.../redis/util/KeyValueMergerFactory.java | 40 +++++++
.../connector/redis/RedisTestCaseTemplateIT.java | 115 +++++++++++++++++++
.../scan-list-to-redis-list-with-key.conf | 74 ++++++++++++
.../resources/scan-redis-to-redis-with-key.conf | 125 +++++++++++++++++++++
.../scan-set-to-redis-list-set-with-key.conf | 74 ++++++++++++
.../resources/scan-string-to-redis-with-key.conf | 74 ++++++++++++
.../scan-zset-to-redis-list-zset-with-key.conf | 74 ++++++++++++
18 files changed, 1069 insertions(+), 110 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 2672e0bf7d..d5e54feb82 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1419,7 +1419,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 120
+ timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 3833f35e25..3ea83cac43 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -19,22 +19,25 @@ Used to read data from Redis.
## Options
-| name | type | required | default value |
-| ------------------- | ------ |-----------------------| ------------- |
-| host | string | yes when mode=single | - |
-| port | int | no | 6379 |
-| keys | string | yes | - |
-| batch_size | int | yes | 10 |
-| data_type | string | yes | - |
-| user | string | no | - |
-| auth | string | no | - |
-| db_num | int | no | 0 |
-| mode | string | no | single |
-| hash_key_parse_mode | string | no | all |
-| nodes | list | yes when mode=cluster | - |
-| schema | config | yes when format=json | - |
-| format | string | no | json |
-| common-options | | no | - |
+| name | type | required | default
value |
+|---------------------| ------ |--------------------------------|
------------- |
+| host | string | yes when mode=single | -
|
+| port | int | no | 6379
|
+| keys | string | yes | -
|
+| read_key_enabled | boolean| no | false
|
+| key_field_name | string | yes when read_key_enabled=true | key
|
+| batch_size | int | yes | 10
|
+| data_type | string | yes | -
|
+| user | string | no | -
|
+| auth | string | no | -
|
+| db_num | int | no | 0
|
+| mode | string | no | single
|
+| hash_key_parse_mode | string | no | all
|
+| nodes | list | yes when mode=cluster | -
|
+| schema | config | yes when format=json | -
|
+| format | string | no | json
|
+| single_field_name | string | yes when read_key_enabled=true | -
|
+| common-options | | no | -
|
### host [string]
@@ -114,6 +117,37 @@ each kv that in hash key it will be treated as a row and
send it to upstream.
keys pattern
+### read_key_enabled [boolean]
+
+This option determines whether the Redis source connector includes the Redis
key in each output record when reading data.
+
+When set to `true`, both the key and its associated value are included in the
record.
+
+By default (`false`), only the value is read and included.
+
+If you are using a single-value Redis data type (such as `string`, `int`,
etc.) with `read_key_enabled = true`,
+you must also specify `single_field_name` to map the value to a schema column,
and `key_field_name` to map the Redis key.
+
+Note: When `read_key_enabled = true`, the schema configuration must explicitly
include the key field to correctly map the deserialized data.
+
+Example :
+```hocon
+schema {
+ fields {
+ key = string
+ value = string
+ }
+}
+```
+
+### key_field_name [string]
+
+Specifies the field name to store the Redis key in the output record when
`read_key_enabled = true`.
+
+If not set, the default field name `key` will be used.
+
+This field is useful when the default `key` field name conflicts with existing
schema fields, or if a more descriptive name is preferred.
+
### batch_size [int]
indicates the number of keys to attempt to return per iteration,default 10
@@ -224,6 +258,26 @@ connector will generate data as the following:
the schema fields of redis data
+### single_field_name [string]
+
+Specifies the field name for Redis values when `read_key_enabled = true` and
the value is a single primitive (e.g., `string`, `int`).
+
+This name is used in the schema to map the value field.
+
+**Note:** This option has no effect when reading complex Redis data types such
as hashes or objects that can be directly mapped to a schema.
+
+Example :
+```hocon
+read_key_enabled = true
+single_field_name = value
+schema {
+ fields {
+ key = string
+ value = string
+ }
+}
+```
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index c9e4337d79..3d64fa0e9c 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -55,6 +55,9 @@ public class RedisParameters implements Serializable {
private RedisDataType redisDataType;
private RedisBaseOptions.RedisMode mode;
private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
+ private Boolean readKeyEnabled;
+ private String singleFieldName;
+ private String keyFieldName;
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisSinkOptions.EXPIRE.defaultValue();
private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
@@ -74,6 +77,14 @@ public class RedisParameters implements Serializable {
this.dbNum = config.get(RedisBaseOptions.DB_NUM);
// set hash key mode
this.hashKeyParseMode =
config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
+ // set read with key
+ this.readKeyEnabled = config.get(RedisSourceOptions.READ_KEY_ENABLED);
+ // set single field name
+ if
(config.getOptional(RedisSourceOptions.SINGLE_FIELD_NAME).isPresent()) {
+ this.singleFieldName =
config.get(RedisSourceOptions.SINGLE_FIELD_NAME);
+ }
+ // set key name
+ this.keyFieldName = config.get(RedisSourceOptions.KEY_FIELD_NAME);
// set expire
this.expire = config.get(RedisSinkOptions.EXPIRE);
// set auth
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
index a02e113cea..673ec64d1d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
@@ -32,4 +32,25 @@ public class RedisSourceOptions extends RedisBaseOptions {
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default
value is all");
+
+ public static final Option<Boolean> READ_KEY_ENABLED =
+ Options.key("read_key_enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If set to true, the source connector reads Redis
values along with their keys.");
+
+ public static final Option<String> SINGLE_FIELD_NAME =
+ Options.key("single_field_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specifies the field name to be used in the output
row when reading single-value types "
+ + "(e.g., string, list, zset).");
+
+ public static final Option<String> KEY_FIELD_NAME =
+ Options.key("key_field_name")
+ .stringType()
+ .defaultValue("key")
+ .withDescription("The value of key you want to write to
redis.");
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
new file mode 100644
index 0000000000..0fa164a301
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/KeyedRecordReader.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMerger;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class KeyedRecordReader extends RedisRecordReader {
+
+ private final KeyValueMerger keyValueMerger;
+
+ public KeyedRecordReader(
+ RedisParameters redisParameters,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ RedisClient redisClient,
+ KeyValueMerger keyValueMerger) {
+ super(redisParameters, deserializationSchema, redisClient);
+ this.keyValueMerger = keyValueMerger;
+ }
+
+ @Override
+ public void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> zSetList = redisClient.batchGetZset(keys);
+ for (int i = 0; i < zSetList.size(); i++) {
+ for (String value : zSetList.get(i)) {
+ pollValueToNext(keys.get(i), value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollSetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<Set<String>> setList = redisClient.batchGetSet(keys);
+ for (int i = 0; i < setList.size(); i++) {
+ for (String value : setList.get(i)) {
+ pollValueToNext(keys.get(i), value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollListToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> valueList = redisClient.batchGetList(keys);
+ for (int i = 0; i < valueList.size(); i++) {
+ for (String value : valueList.get(i)) {
+ pollValueToNext(keys.get(i), value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollStringToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<String> values = redisClient.batchGetString(keys);
+ for (int i = 0; i < values.size(); i++) {
+ pollValueToNext(keys.get(i), values.get(i), output);
+ }
+ }
+
+ private void pollValueToNext(String key, String value,
Collector<SeaTunnelRow> output)
+ throws IOException {
+ if (deserializationSchema == null) {
+ throw CommonError.illegalArgument(
+ "deserializationSchema is null",
+ "Redis source requires a deserialization schema to parse
the record with key: "
+ + key);
+ } else {
+ String parsed = keyValueMerger.parseWithKey(key, value);
+ deserializationSchema.deserialize(parsed.getBytes(), output);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
new file mode 100644
index 0000000000..398f381787
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisRecordReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public abstract class RedisRecordReader {
+ protected final RedisParameters redisParameters;
+ protected final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ protected RedisClient redisClient;
+
+ protected RedisRecordReader(
+ RedisParameters redisParameters,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ RedisClient redisClient) {
+ this.redisParameters = redisParameters;
+ this.deserializationSchema = deserializationSchema;
+ this.redisClient = redisClient;
+ }
+
+ public void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<Map<String, String>> values = redisClient.batchGetHash(keys);
+ if (deserializationSchema == null) {
+ for (Map<String, String> value : values) {
+ output.collect(new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(value)}));
+ }
+ return;
+ }
+ for (Map<String, String> recordsMap : values) {
+ if (redisParameters.getHashKeyParseMode() ==
RedisSourceOptions.HashKeyParseMode.KV) {
+ deserializationSchema.deserialize(
+ JsonUtils.toJsonString(recordsMap).getBytes(), output);
+ } else {
+ SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(recordsMap)});
+ output.collect(seaTunnelRow);
+ }
+ }
+ }
+
+ public abstract void pollZsetToNext(List<String> keys,
Collector<SeaTunnelRow> output)
+ throws IOException;
+
+ public abstract void pollSetToNext(List<String> keys,
Collector<SeaTunnelRow> output)
+ throws IOException;
+
+ public abstract void pollListToNext(List<String> keys,
Collector<SeaTunnelRow> output)
+ throws IOException;
+
+ public abstract void pollStringToNext(List<String> keys,
Collector<SeaTunnelRow> output)
+ throws IOException;
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
index fd8dae08c4..47b023a43d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceFactory.java
@@ -54,7 +54,10 @@ public class RedisSourceFactory implements
TableSourceFactory {
RedisSourceOptions.HASH_KEY_PARSE_MODE,
RedisBaseOptions.AUTH,
RedisBaseOptions.USER,
- RedisBaseOptions.KEY)
+ RedisBaseOptions.KEY,
+ RedisSourceOptions.READ_KEY_ENABLED,
+ RedisSourceOptions.SINGLE_FIELD_NAME,
+ RedisSourceOptions.KEY_FIELD_NAME)
.conditional(
RedisBaseOptions.MODE,
RedisBaseOptions.RedisMode.CLUSTER,
@@ -64,6 +67,11 @@ public class RedisSourceFactory implements
TableSourceFactory {
RedisBaseOptions.RedisMode.SINGLE,
RedisBaseOptions.HOST,
RedisBaseOptions.PORT)
+ .conditional(
+ RedisSourceOptions.READ_KEY_ENABLED,
+ true,
+ RedisSourceOptions.SINGLE_FIELD_NAME,
+ RedisSourceOptions.KEY_FIELD_NAME)
.bundled(RedisBaseOptions.FORMAT,
SinkConnectorCommonOptions.SCHEMA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index bdb887c097..ea8293ea5a 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -21,26 +21,25 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
-import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMergerFactory;
import org.apache.commons.collections4.CollectionUtils;
+import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
+@Slf4j
public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
{
private final RedisParameters redisParameters;
private final SingleSplitReaderContext context;
@@ -91,27 +90,41 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
private void pollNext(List<String> keys, RedisDataType dataType,
Collector<SeaTunnelRow> output)
throws IOException {
+ RedisRecordReader redisRecordReader;
+ if (Boolean.TRUE.equals(redisParameters.getReadKeyEnabled())) {
+ redisRecordReader =
+ new KeyedRecordReader(
+ redisParameters,
+ deserializationSchema,
+ redisClient,
+ KeyValueMergerFactory.createMerger(
+ deserializationSchema, redisParameters));
+ } else {
+ redisRecordReader =
+ new UnKeyedRecordReader(redisParameters,
deserializationSchema, redisClient);
+ }
+
if (CollectionUtils.isEmpty(keys)) {
return;
}
if (RedisDataType.HASH.equals(dataType)) {
- pollHashMapToNext(keys, output);
+ redisRecordReader.pollHashMapToNext(keys, output);
return;
}
if (RedisDataType.STRING.equals(dataType) ||
RedisDataType.KEY.equals(dataType)) {
- pollStringToNext(keys, output);
+ redisRecordReader.pollStringToNext(keys, output);
return;
}
if (RedisDataType.LIST.equals(dataType)) {
- pollListToNext(keys, output);
+ redisRecordReader.pollListToNext(keys, output);
return;
}
if (RedisDataType.SET.equals(dataType)) {
- pollSetToNext(keys, output);
+ redisRecordReader.pollSetToNext(keys, output);
return;
}
if (RedisDataType.ZSET.equals(dataType)) {
- pollZsetToNext(keys, output);
+ redisRecordReader.pollZsetToNext(keys, output);
return;
}
throw new RedisConnectorException(
@@ -119,73 +132,6 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
"UnSupport redisDataType,only support
string,list,hash,set,zset");
}
- private void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
- throws IOException {
- List<List<String>> zSetList = redisClient.batchGetZset(keys);
- for (List<String> values : zSetList) {
- for (String value : values) {
- pollValueToNext(value, output);
- }
- }
- }
-
- private void pollSetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
- throws IOException {
- List<Set<String>> setList = redisClient.batchGetSet(keys);
- for (Set<String> values : setList) {
- for (String value : values) {
- pollValueToNext(value, output);
- }
- }
- }
-
- private void pollListToNext(List<String> keys, Collector<SeaTunnelRow>
output)
- throws IOException {
- List<List<String>> valueList = redisClient.batchGetList(keys);
- for (List<String> values : valueList) {
- for (String value : values) {
- pollValueToNext(value, output);
- }
- }
- }
-
- private void pollStringToNext(List<String> keys, Collector<SeaTunnelRow>
output)
- throws IOException {
- List<String> values = redisClient.batchGetString(keys);
- for (String value : values) {
- pollValueToNext(value, output);
- }
- }
-
- private void pollValueToNext(String value, Collector<SeaTunnelRow> output)
throws IOException {
- if (deserializationSchema == null) {
- output.collect(new SeaTunnelRow(new Object[] {value}));
- } else {
- deserializationSchema.deserialize(value.getBytes(), output);
- }
- }
-
- private void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow>
output)
- throws IOException {
- List<Map<String, String>> values = redisClient.batchGetHash(keys);
- if (deserializationSchema == null) {
- for (Map<String, String> value : values) {
- output.collect(new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(value)}));
- }
- return;
- }
- for (Map<String, String> recordsMap : values) {
- if (redisParameters.getHashKeyParseMode() ==
RedisSourceOptions.HashKeyParseMode.KV) {
- deserializationSchema.deserialize(
- JsonUtils.toJsonString(recordsMap).getBytes(), output);
- } else {
- SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(recordsMap)});
- output.collect(seaTunnelRow);
- }
- }
- }
-
private RedisDataType resolveScanType(RedisDataType dataType) {
if (RedisDataType.KEY.equals(dataType)) {
return RedisDataType.STRING;
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
new file mode 100644
index 0000000000..8b7c605d5c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/UnKeyedRecordReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+public class UnKeyedRecordReader extends RedisRecordReader {
+
+ public UnKeyedRecordReader(
+ RedisParameters redisParameters,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ RedisClient redisClient) {
+ super(redisParameters, deserializationSchema, redisClient);
+ }
+
+ @Override
+ public void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> zSetList = redisClient.batchGetZset(keys);
+ for (List<String> values : zSetList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollSetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<Set<String>> setList = redisClient.batchGetSet(keys);
+ for (Set<String> values : setList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollListToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> valueList = redisClient.batchGetList(keys);
+ for (List<String> values : valueList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ @Override
+ public void pollStringToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<String> values = redisClient.batchGetString(keys);
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+
+ private void pollValueToNext(String value, Collector<SeaTunnelRow> output)
throws IOException {
+ if (deserializationSchema == null) {
+ output.collect(new SeaTunnelRow(new Object[] {value}));
+ } else {
+ deserializationSchema.deserialize(value.getBytes(), output);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
new file mode 100644
index 0000000000..cacd9afaec
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/JsonKeyValueMerger.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class JsonKeyValueMerger implements KeyValueMerger {
+ private final RedisParameters redisParameters;
+
+ public JsonKeyValueMerger(RedisParameters redisParameters) {
+ this.redisParameters = redisParameters;
+ }
+
+ @Override
+ public String parseWithKey(String key, String value) {
+ ObjectNode objectNode = getObjectNode(key, value);
+ return objectNode.toString();
+ }
+
+ private ObjectNode getObjectNode(String key, String value) {
+ JsonNode node = JsonUtils.toJsonNode(value);
+ if (node.isTextual()) {
+ String text = node.textValue();
+ if (looksLikeJson(text)) {
+ try {
+ node = JsonUtils.parseObject(text);
+ } catch (Exception e) {
+ log.debug(
+ "Looks like JSON, but failed to parse JSON object
from text value: {}",
+ node.textValue());
+ }
+ }
+ }
+
+ ObjectNode objectNode;
+ if (node instanceof ObjectNode) {
+ objectNode = (ObjectNode) node;
+ } else {
+ objectNode = JsonUtils.createObjectNode();
+ setValueInNode(objectNode, node);
+ }
+ objectNode.put(redisParameters.getKeyFieldName(), key);
+ return objectNode;
+ }
+
+ public static boolean looksLikeJson(String text) {
+ return text != null
+ && ((text.startsWith("{") && text.endsWith("}"))
+ || (text.startsWith("[") && text.endsWith("]")));
+ }
+
+ private void setValueInNode(ObjectNode objectNode, JsonNode node) {
+ String singleFieldName = redisParameters.getSingleFieldName();
+ if (singleFieldName != null) {
+ objectNode.set(singleFieldName, node);
+ } else {
+ throw CommonError.illegalArgument(
+ "singleFieldName is null",
+ "You must specify 'single_field_name' when using a single
value with key-enabled schema.");
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
similarity index 54%
copy from
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
copy to
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
index a02e113cea..c2d6b88d43 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMerger.java
@@ -15,21 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.redis.config;
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class RedisSourceOptions extends RedisBaseOptions {
- public enum HashKeyParseMode {
- ALL,
- KV;
- }
-
- public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
- Options.key("hash_key_parse_mode")
- .enumType(HashKeyParseMode.class)
- .defaultValue(HashKeyParseMode.ALL)
- .withDescription(
- "hash key parse mode, support all or kv, default
value is all");
+public interface KeyValueMerger {
+ String parseWithKey(String key, String value);
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
new file mode 100644
index 0000000000..36fffcf4cb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/util/KeyValueMergerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.util;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+public class KeyValueMergerFactory {
+ private KeyValueMergerFactory() {}
+
+ public static KeyValueMerger createMerger(
+ DeserializationSchema<?> schema, RedisParameters redisParameters) {
+ if (schema == null) {
+ throw CommonError.illegalArgument(
+ "deserializationSchema is null",
+ "Redis source requires a deserialization schema to parse
the record with key");
+ }
+ if (schema instanceof JsonDeserializationSchema) {
+ return new JsonKeyValueMerger(redisParameters);
+ }
+ throw CommonError.unsupportedOperation("Redis",
schema.getClass().getTypeName());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index bdc66016db..c66a9baedf 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -360,6 +360,121 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
}
}
+ @TestTemplate
+ public void testScanStringTypeWriteRedisWithKey(TestContainer container)
+ throws IOException, InterruptedException {
+ String keyPrefix = "string_test";
+ for (int i = 0; i < 1000; i++) {
+ jedis.set(keyPrefix + i, "val");
+ }
+ Container.ExecResult execResult =
+ container.executeJob("/scan-string-to-redis-with-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("string_test_list", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ for (int i = 0; i < 1000; i++) {
+ Assertions.assertTrue(list.get(i).contains("_suffix"));
+ }
+ jedis.del("string_test_list");
+ for (int i = 0; i < 1000; i++) {
+ jedis.del(keyPrefix + i);
+ }
+ }
+
+ @TestTemplate
+ public void testScanListTypeWriteRedisWithKey(TestContainer container)
+ throws IOException, InterruptedException {
+ String keyPrefix = "list-test-read";
+ for (int i = 0; i < 100; i++) {
+ String list = keyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.lpush(list, "val" + j);
+ }
+ }
+ Container.ExecResult execResult =
+ container.executeJob("/scan-list-to-redis-list-with-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("list-test-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ for (int i = 0; i < 1000; i++) {
+ Assertions.assertTrue(list.get(i).contains("_suffix"));
+ }
+ jedis.del("list-test-check");
+ for (int i = 0; i < 100; i++) {
+ String delKey = keyPrefix + i;
+ jedis.del(delKey);
+ }
+ }
+
+ @TestTemplate
+ public void testScanSetTypeWriteRedisWithKey(TestContainer container)
+ throws IOException, InterruptedException {
+ String setKeyPrefix = "key-test-set";
+ for (int i = 0; i < 100; i++) {
+ String setKey = setKeyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.sadd(setKey, j + "");
+ }
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/scan-set-to-redis-list-set-with-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("key-set-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+
+ for (int i = 0; i < 1000; i++) {
+ Assertions.assertTrue(list.get(i).contains("_suffix"));
+ }
+
+ jedis.del("key-set-check");
+ for (int i = 0; i < 100; i++) {
+ String setKey = setKeyPrefix + i;
+ jedis.del(setKey);
+ }
+ }
+
+ @TestTemplate
+ public void testScanZsetTypeWriteRedisWithKey(TestContainer container)
+ throws IOException, InterruptedException {
+ String zSetKeyPrefix = "key-test-zset";
+ for (int i = 0; i < 100; i++) {
+ String key = zSetKeyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.zadd(key, 1, j + "");
+ }
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/scan-zset-to-redis-list-zset-with-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("key-zset-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+
+ for (int i = 0; i < 1000; i++) {
+ Assertions.assertTrue(list.get(i).contains("_suffix"));
+ }
+
+ jedis.del("key-zset-check");
+ for (int i = 0; i < 100; i++) {
+ String key = zSetKeyPrefix + i;
+ jedis.del(key);
+ }
+ }
+
+ @TestTemplate
+ public void testCustomKeyWriteRedisWithKey(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/scan-redis-to-redis-with-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ for (int i = 0; i < 100; i++) {
+ Assertions.assertTrue(jedis.exists("redis-key-check:" + "key_test"
+ i));
+ }
+ for (int i = 0; i < 100; i++) {
+ jedis.del("redis-key-check:" + i);
+ }
+ }
+
@TestTemplate
public void testMultipletableRedisSink(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
new file mode 100644
index 0000000000..a7a70870ca
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-to-redis-list-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "list-test-read*"
+ data_type = list
+ batch_size = 33
+ read_key_enabled = true
+ key_field_name = custom_key
+ single_field_name = custom_value
+ format = json
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "custom_key"
+ type = "string"
+ },
+ {
+ name = "custom_value"
+ type = "string"
+ }
+ ]
+ }
+ }
+}
+
+transform {
+ Sql {
+ query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM
source_table"
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "list-test-check"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
new file mode 100644
index 0000000000..2510f7d4f4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-redis-to-redis-with-key.conf
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ read_key_enabled = true
+ key_field_name = key
+ single_field_name = value
+ format = json
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "key"
+ type = "string"
+ },
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "redis-key-check:{key}"
+ support_custom_key = true
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
new file mode 100644
index 0000000000..a68c7feb18
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key-test-set*"
+ data_type = set
+ batch_size = 33
+ read_key_enabled = true
+ key_field_name = custom_key
+ single_field_name = custom_value
+ format = json
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "custom_key"
+ type = "string"
+ },
+ {
+ name = "custom_value"
+ type = "string"
+ }
+ ]
+ }
+ }
+}
+
+transform {
+ Sql {
+ query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM
source_table"
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key-set-check"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
new file mode 100644
index 0000000000..fc9ac1e98f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "string_test*"
+ data_type = string
+ batch_size = 33
+ read_key_enabled = true
+ key_field_name = custom_key
+ single_field_name = custom_value
+ format = json
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "custom_key"
+ type = "string"
+ },
+ {
+ name = "custom_value"
+ type = "string"
+ }
+ ]
+ }
+ }
+}
+
+transform {
+ Sql {
+ query = "SELECT custom_key, CONCAT(custom_key, '_suffix') AS value FROM
source_table"
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "string_test_list"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
new file mode 100644
index 0000000000..e70adc2cb1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-with-key.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key-test-zset*"
+ data_type = zset
+ batch_size = 33
+ read_key_enabled = true
+ key_field_name = custom_key
+ single_field_name = custom_value
+ format = json
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "custom_key"
+ type = "string"
+ },
+ {
+ name = "custom_value"
+ type = "string"
+ }
+ ]
+ }
+ }
+}
+
+transform {
+ Sql {
+ query = "SELECT custom_key, CONCAT(custom_value, '_suffix') AS value FROM
source_table"
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key-zset-check"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file