This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new be37f05c07 [Improve][Redis] Redis reader use scan cammnd instead of 
keys, single mode reader/writer support batch (#7087)
be37f05c07 is described below

commit be37f05c07eecccdb3805545080d1b61a6c10755
Author: FuYouJ <[email protected]>
AuthorDate: Wed Jul 3 22:17:10 2024 +0800

    [Improve][Redis] Redis reader use scan cammnd instead of keys, single mode 
reader/writer support batch (#7087)
---
 docs/en/connector-v2/sink/Redis.md                 |   5 +
 docs/en/connector-v2/source/Redis.md               |   5 +
 release-note.md                                    |   1 +
 .../seatunnel/redis/client/RedisClient.java        |  77 ++++++++
 .../seatunnel/redis/client/RedisClusterClient.java | 138 +++++++++++++
 .../seatunnel/redis/client/RedisSingleClient.java  | 216 +++++++++++++++++++++
 .../seatunnel/redis/config/RedisConfig.java        |  10 +-
 .../seatunnel/redis/config/RedisDataType.java      |  12 ++
 .../seatunnel/redis/config/RedisParameters.java    |  15 ++
 .../seatunnel/redis/sink/RedisSinkWriter.java      |  65 ++++++-
 .../seatunnel/redis/source/RedisSourceReader.java  | 162 ++++++++++++----
 .../seatunnel/e2e/connector/redis/RedisIT.java     | 113 +++++++++++
 .../test/resources/redis-to-redis-by-db-num.conf   |   2 +
 .../src/test/resources/redis-to-redis-expire.conf  |   1 +
 .../src/test/resources/redis-to-redis.conf         |   4 +-
 ...onf => scan-hash-to-redis-list-hash-check.conf} |  10 +-
 ...n-list-test-read-to-redis-list-test-check.conf} |  10 +-
 ....conf => scan-set-to-redis-list-set-check.conf} |  10 +-
 ...is-by-db-num.conf => scan-string-to-redis.conf} |  10 +-
 ...onf => scan-zset-to-redis-list-zset-check.conf} |  10 +-
 20 files changed, 806 insertions(+), 70 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index f91e6bc6ec..ac4cd55cc4 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -18,6 +18,7 @@ Used to write data to Redis.
 | port           | int    | yes                   | -             |
 | key            | string | yes                   | -             |
 | data_type      | string | yes                   | -             |
+| batch_size     | int    | no                    | 10            |
 | user           | string | no                    | -             |
 | auth           | string | no                    | -             |
 | db_num         | int    | no                    | 0             |
@@ -83,6 +84,10 @@ Redis data types, support `key` `hash` `list` `set` `zset`
 - zset
 
 > Each data from upstream will be added to the configured zset key with a 
 > weight of 1. So the order of data in zset is based on the order of data 
 > consumption.
+>
+  ### batch_size [int]
+
+ensure the batch write size in single-machine mode; no guarantees in cluster 
mode.
 
 ### user [string]
 
diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index 3029f8061d..9af103f884 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -22,6 +22,7 @@ Used to read data from Redis.
 | host                | string | yes                   | -             |
 | port                | int    | yes                   | -             |
 | keys                | string | yes                   | -             |
+| batch_size          | int    | yes                   | 10            |
 | data_type           | string | yes                   | -             |
 | user                | string | no                    | -             |
 | auth                | string | no                    | -             |
@@ -113,6 +114,10 @@ each kv that in hash key it will be treated as a row and 
send it to upstream.
 
 keys pattern
 
+### batch_size [int]
+
+indicates the number of keys to attempt to return per iteration,default 10
+
 **Tips:Redis source connector support fuzzy key matching, user needs to ensure 
that the matched keys are the same type**
 
 ### data_type [string]
diff --git a/release-note.md b/release-note.md
index 24455c40ac..0b5e884572 100644
--- a/release-note.md
+++ b/release-note.md
@@ -56,6 +56,7 @@
 - [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
 - [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
 - [Connector-v2] [Mongodb] Support to convert to double from numeric type that 
mongodb saved it as numeric internally (#6997)
+- [Connector-v2] [Redis] Using scan replace keys operation command,support 
batchWrite in single mode(#7030,#7085) 
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
new file mode 100644
index 0000000000..5730838cca
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class RedisClient extends Jedis {
+
+    protected final RedisParameters redisParameters;
+
+    protected final int batchSize;
+
+    protected final Jedis jedis;
+
+    protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
+        this.redisParameters = redisParameters;
+        this.batchSize = redisParameters.getBatchSize();
+        this.jedis = jedis;
+    }
+
+    public ScanResult<String> scanKeys(
+            String cursor, int batchSize, String keysPattern, RedisDataType 
type) {
+        ScanParams scanParams = new ScanParams();
+        scanParams.match(keysPattern);
+        scanParams.count(batchSize);
+        return jedis.scan(cursor, scanParams, type.name());
+    }
+
+    public abstract List<String> batchGetString(List<String> keys);
+
+    public abstract List<List<String>> batchGetList(List<String> keys);
+
+    public abstract List<Set<String>> batchGetSet(List<String> keys);
+
+    public abstract List<Map<String, String>> batchGetHash(List<String> keys);
+
+    public abstract List<List<String>> batchGetZset(List<String> keys);
+
+    public abstract void batchWriteString(
+            List<String> keys, List<String> values, long expireSeconds);
+
+    public abstract void batchWriteList(
+            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+
+    public abstract void batchWriteSet(
+            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+
+    public abstract void batchWriteHash(
+            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+
+    public abstract void batchWriteZset(
+            List<String> keyBuffer, List<String> valueBuffer, long 
expireSeconds);
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
new file mode 100644
index 0000000000..13acc89def
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RedisClusterClient extends RedisClient {
+    public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
+        super(redisParameters, jedis);
+    }
+
+    @Override
+    public List<String> batchGetString(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<String> result = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            result.add(jedis.get(key));
+        }
+        return result;
+    }
+
+    @Override
+    public List<List<String>> batchGetList(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<List<String>> result = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            result.add(jedis.lrange(key, 0, -1));
+        }
+        return result;
+    }
+
+    @Override
+    public List<Set<String>> batchGetSet(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<Set<String>> result = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            result.add(jedis.smembers(key));
+        }
+        return result;
+    }
+
+    @Override
+    public List<Map<String, String>> batchGetHash(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<Map<String, String>> result = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            Map<String, String> map = jedis.hgetAll(key);
+            map.put("hash_key", key);
+            result.add(map);
+        }
+        return result;
+    }
+
+    @Override
+    public List<List<String>> batchGetZset(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<List<String>> result = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            result.add(jedis.zrange(key, 0, -1));
+        }
+        return result;
+    }
+
+    @Override
+    public void batchWriteString(List<String> keys, List<String> values, long 
expireSeconds) {
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            RedisDataType.STRING.set(this, keys.get(i), values.get(i), 
expireSeconds);
+        }
+    }
+
+    @Override
+    public void batchWriteList(List<String> keys, List<String> values, long 
expireSeconds) {
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            RedisDataType.LIST.set(this, keys.get(i), values.get(i), 
expireSeconds);
+        }
+    }
+
+    @Override
+    public void batchWriteSet(List<String> keys, List<String> values, long 
expireSeconds) {
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            RedisDataType.SET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+        }
+    }
+
+    @Override
+    public void batchWriteHash(List<String> keys, List<String> values, long 
expireSeconds) {
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            RedisDataType.HASH.set(this, keys.get(i), values.get(i), 
expireSeconds);
+        }
+    }
+
+    @Override
+    public void batchWriteZset(List<String> keys, List<String> values, long 
expireSeconds) {
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            RedisDataType.ZSET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
new file mode 100644
index 0000000000..99bae5e733
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// In standalone mode, pipeline can be used to improve batch read performance
+public class RedisSingleClient extends RedisClient {
+
+    public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) {
+        super(redisParameters, jedis);
+    }
+
+    @Override
+    public List<String> batchGetString(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        String[] keyArr = keys.toArray(new String[0]);
+        return jedis.mget(keyArr);
+    }
+
+    @Override
+    public List<List<String>> batchGetList(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        Pipeline pipeline = jedis.pipelined();
+        List<Response<List<String>>> responses = new ArrayList<>(keys.size());
+
+        for (String key : keys) {
+            responses.add(pipeline.lrange(key, 0, -1));
+        }
+
+        pipeline.sync();
+
+        List<List<String>> resultList = new ArrayList<>(keys.size());
+        for (Response<List<String>> response : responses) {
+            resultList.add(response.get());
+        }
+
+        return resultList;
+    }
+
+    @Override
+    public List<Set<String>> batchGetSet(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        Pipeline pipeline = jedis.pipelined();
+        List<Response<Set<String>>> responses = new ArrayList<>(keys.size());
+
+        for (String key : keys) {
+            responses.add(pipeline.smembers(key));
+        }
+
+        pipeline.sync();
+
+        List<Set<String>> resultList = new ArrayList<>(keys.size());
+        for (Response<Set<String>> response : responses) {
+            resultList.add(response.get());
+        }
+
+        return resultList;
+    }
+
+    @Override
+    public List<Map<String, String>> batchGetHash(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        Pipeline pipeline = jedis.pipelined();
+        List<Response<Map<String, String>>> responses = new 
ArrayList<>(keys.size());
+
+        for (String key : keys) {
+            Response<Map<String, String>> response = pipeline.hgetAll(key);
+            responses.add(response);
+        }
+
+        pipeline.sync();
+
+        List<Map<String, String>> resultList = new ArrayList<>(keys.size());
+        for (int i = 0; i < keys.size(); i++) {
+            Response<Map<String, String>> response = responses.get(i);
+            Map<String, String> map = response.get();
+            if (map != null) {
+                map.put("hash_key", keys.get(i));
+            }
+            resultList.add(map);
+        }
+
+        return resultList;
+    }
+
+    @Override
+    public List<List<String>> batchGetZset(List<String> keys) {
+        if (CollectionUtils.isEmpty(keys)) {
+            return new ArrayList<>();
+        }
+        List<Response<List<String>>> responses = new ArrayList<>(keys.size());
+        Pipeline pipelined = jedis.pipelined();
+        for (String key : keys) {
+            Response<List<String>> response = pipelined.zrange(key, 0, -1);
+            responses.add(response);
+        }
+        pipelined.sync();
+        List<List<String>> resultlist = new ArrayList<>(keys.size());
+        for (Response<List<String>> response : responses) {
+            resultlist.add(response.get());
+        }
+        return resultlist;
+    }
+
+    @Override
+    public void batchWriteString(List<String> keys, List<String> values, long 
expireSeconds) {
+        Pipeline pipelined = jedis.pipelined();
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            String key = keys.get(i);
+            String value = values.get(i);
+            pipelined.set(key, value);
+            if (expireSeconds > 0) {
+                pipelined.expire(key, expireSeconds);
+            }
+        }
+        pipelined.sync();
+    }
+
+    @Override
+    public void batchWriteList(List<String> keys, List<String> values, long 
expireSeconds) {
+        Pipeline pipelined = jedis.pipelined();
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            String key = keys.get(i);
+            String value = values.get(i);
+            pipelined.lpush(key, value);
+            if (expireSeconds > 0) {
+                pipelined.expire(key, expireSeconds);
+            }
+        }
+        pipelined.sync();
+    }
+
+    @Override
+    public void batchWriteSet(List<String> keys, List<String> values, long 
expireSeconds) {
+        Pipeline pipelined = jedis.pipelined();
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            String key = keys.get(i);
+            String value = values.get(i);
+            pipelined.sadd(key, value);
+            if (expireSeconds > 0) {
+                pipelined.expire(key, expireSeconds);
+            }
+        }
+        pipelined.sync();
+    }
+
+    @Override
+    public void batchWriteHash(List<String> keys, List<String> values, long 
expireSeconds) {
+        Pipeline pipelined = jedis.pipelined();
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            String key = keys.get(i);
+            String value = values.get(i);
+            Map<String, String> fieldsMap = JsonUtils.toMap(value);
+            pipelined.hset(key, fieldsMap);
+            if (expireSeconds > 0) {
+                pipelined.expire(key, expireSeconds);
+            }
+        }
+        pipelined.sync();
+    }
+
+    @Override
+    public void batchWriteZset(List<String> keys, List<String> values, long 
expireSeconds) {
+        Pipeline pipelined = jedis.pipelined();
+        int size = keys.size();
+        for (int i = 0; i < size; i++) {
+            String key = keys.get(i);
+            String value = values.get(i);
+            pipelined.zadd(key, 1, value);
+            if (expireSeconds > 0) {
+                pipelined.expire(key, expireSeconds);
+            }
+        }
+        pipelined.sync();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index c8a0a02dc7..3be5b39de9 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -83,7 +83,7 @@ public class RedisConfig {
             Options.key("data_type")
                     .enumType(RedisDataType.class)
                     .noDefaultValue()
-                    .withDescription("redis data types, support key hash list 
set zset.");
+                    .withDescription("redis data types, support string hash 
list set zset.");
 
     public static final Option<RedisConfig.Format> FORMAT =
             Options.key("format")
@@ -119,6 +119,14 @@ public class RedisConfig {
                     .defaultValue(-1L)
                     .withDescription("Set redis expiration time.");
 
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "batch_size is used to control the size of a batch 
of data during read and write operations"
+                                    + ",default 10");
+
     public enum Format {
         JSON,
         // TEXT will be supported later
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index a315e0cdae..aac874254d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -40,6 +40,18 @@ public enum RedisDataType {
             return Collections.singletonList(jedis.get(key));
         }
     },
+    STRING {
+        @Override
+        public void set(Jedis jedis, String key, String value, long expire) {
+            jedis.set(key, value);
+            expire(jedis, key, expire);
+        }
+
+        @Override
+        public List<String> get(Jedis jedis, String key) {
+            return Collections.singletonList(jedis.get(key));
+        }
+    },
     HASH {
         @Override
         public void set(Jedis jedis, String key, String value, long expire) {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index d32f3bbb0d..ef1fe104d7 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -19,6 +19,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -48,6 +51,7 @@ public class RedisParameters implements Serializable {
     private RedisConfig.HashKeyParseMode hashKeyParseMode;
     private List<String> redisNodes = Collections.emptyList();
     private long expire = RedisConfig.EXPIRE.defaultValue();
+    private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
 
     public void buildWithConfig(ReadonlyConfig config) {
         // set host
@@ -84,6 +88,17 @@ public class RedisParameters implements Serializable {
         }
         // set redis data type verification factory createAndPrepareSource
         this.redisDataType = config.get(RedisConfig.DATA_TYPE);
+        // Indicates the number of keys to attempt to return per 
iteration.default 10
+        this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+    }
+
+    public RedisClient buildRedisClient() {
+        Jedis jedis = this.buildJedis();
+        if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
+            return new RedisSingleClient(this, jedis);
+        } else {
+            return new RedisClusterClient(this, jedis);
+        }
     }
 
     public Jedis buildJedis() {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 23eda57202..f03c5c48c8 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -21,24 +21,30 @@ import 
org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
-import redis.clients.jedis.Jedis;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 
 public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
     private final SeaTunnelRowType seaTunnelRowType;
     private final RedisParameters redisParameters;
     private final SerializationSchema serializationSchema;
-    private final Jedis jedis;
+    private final RedisClient redisClient;
+
+    private final int batchSize;
+
+    private final List<String> keyBuffer;
+    private final List<String> valueBuffer;
 
     public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters 
redisParameters) {
         this.seaTunnelRowType = seaTunnelRowType;
@@ -46,13 +52,15 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         // TODO according to format to initialize serializationSchema
         // Now temporary using json serializationSchema
         this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
-        this.jedis = redisParameters.buildJedis();
+        this.redisClient = redisParameters.buildRedisClient();
+        this.batchSize = redisParameters.getBatchSize();
+        this.keyBuffer = new ArrayList<>(batchSize);
+        this.valueBuffer = new ArrayList<>(batchSize);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
         String data = new String(serializationSchema.serialize(element));
-        RedisDataType redisDataType = redisParameters.getRedisDataType();
         String keyField = redisParameters.getKeyField();
         List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
         String key;
@@ -61,14 +69,51 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         } else {
             key = keyField;
         }
-        long expire = redisParameters.getExpire();
-        redisDataType.set(jedis, key, data, expire);
+        keyBuffer.add(key);
+        valueBuffer.add(data);
+        if (keyBuffer.size() >= batchSize) {
+            doBatchWrite();
+            clearBuffer();
+        }
+    }
+
+    private void clearBuffer() {
+        keyBuffer.clear();
+        valueBuffer.clear();
+    }
+
+    private void doBatchWrite() {
+        RedisDataType redisDataType = redisParameters.getRedisDataType();
+        if (RedisDataType.KEY.equals(redisDataType) || 
RedisDataType.STRING.equals(redisDataType)) {
+            redisClient.batchWriteString(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            return;
+        }
+        if (RedisDataType.LIST.equals(redisDataType)) {
+            redisClient.batchWriteList(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            return;
+        }
+        if (RedisDataType.SET.equals(redisDataType)) {
+            redisClient.batchWriteSet(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            return;
+        }
+        if (RedisDataType.HASH.equals(redisDataType)) {
+            redisClient.batchWriteHash(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            return;
+        }
+        if (RedisDataType.ZSET.equals(redisDataType)) {
+            redisClient.batchWriteZset(keyBuffer, valueBuffer, 
redisParameters.getExpire());
+            return;
+        }
+        throw new RedisConnectorException(
+                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                "UnSupport redisDataType,only support 
string,list,hash,set,zset");
     }
 
     @Override
     public void close() throws IOException {
-        if (Objects.nonNull(jedis)) {
-            jedis.close();
+        if (!keyBuffer.isEmpty()) {
+            doBatchWrite();
+            clearBuffer();
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index 7dcea693fa..10825be8bc 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -19,17 +19,21 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.source;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 
-import redis.clients.jedis.Jedis;
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,7 +45,7 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     private final RedisParameters redisParameters;
     private final SingleSplitReaderContext context;
     private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
-    private Jedis jedis;
+    private RedisClient redisClient;
 
     public RedisSourceReader(
             RedisParameters redisParameters,
@@ -54,46 +58,138 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
 
     @Override
     public void open() throws Exception {
-        this.jedis = redisParameters.buildJedis();
+        this.redisClient = redisParameters.buildRedisClient();
     }
 
     @Override
     public void close() throws IOException {
-        if (Objects.nonNull(jedis)) {
-            jedis.close();
+        if (Objects.nonNull(redisClient)) {
+            redisClient.close();
         }
     }
 
     @Override
     public void internalPollNext(Collector<SeaTunnelRow> output) throws 
Exception {
-        Set<String> keys = jedis.keys(redisParameters.getKeysPattern());
-        RedisDataType redisDataType = redisParameters.getRedisDataType();
-        for (String key : keys) {
-            List<String> values = redisDataType.get(jedis, key);
-            for (String value : values) {
-                if (deserializationSchema == null) {
-                    output.collect(new SeaTunnelRow(new Object[] {value}));
-                } else {
-                    if (redisParameters.getHashKeyParseMode() == 
RedisConfig.HashKeyParseMode.KV
-                            && redisDataType == RedisDataType.HASH) {
-                        // Treat each key-value pair in the hash-key as one 
piece of data
-                        Map<String, String> recordsMap = 
JsonUtils.toMap(value);
-                        for (Map.Entry<String, String> entry : 
recordsMap.entrySet()) {
-                            String k = entry.getKey();
-                            String v = entry.getValue();
-                            Map<String, String> valuesMap = JsonUtils.toMap(v);
-                            SeaTunnelDataType<SeaTunnelRow> seaTunnelRowType =
-                                    deserializationSchema.getProducedType();
-                            valuesMap.put(((SeaTunnelRowType) 
seaTunnelRowType).getFieldName(0), k);
-                            deserializationSchema.deserialize(
-                                    
JsonUtils.toJsonString(valuesMap).getBytes(), output);
-                        }
-                    } else {
-                        deserializationSchema.deserialize(value.getBytes(), 
output);
-                    }
-                }
+        RedisDataType redisDataType = 
resolveScanType(redisParameters.getRedisDataType());
+        String cursor = ScanParams.SCAN_POINTER_START;
+        String keysPattern = redisParameters.getKeysPattern();
+        int batchSize = redisParameters.getBatchSize();
+        while (true) {
+            // String cursor, int batchSize, String keysPattern, RedisType type
+            ScanResult<String> scanResult =
+                    redisClient.scanKeys(cursor, batchSize, keysPattern, 
redisDataType);
+            cursor = scanResult.getCursor();
+            List<String> keys = scanResult.getResult();
+            if (CollectionUtils.isEmpty(keys)) {
+                break;
+            }
+            pollNext(keys, redisDataType, output);
+            // when cursor return "0", scan end
+            if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
+                break;
             }
         }
         context.signalNoMoreElement();
     }
+
+    private void pollNext(List<String> keys, RedisDataType dataType, 
Collector<SeaTunnelRow> output)
+            throws IOException {
+        if (RedisDataType.HASH.equals(dataType)) {
+            pollHashMapToNext(keys, output);
+            return;
+        }
+        if (RedisDataType.STRING.equals(dataType) || 
RedisDataType.KEY.equals(dataType)) {
+            pollStringToNext(keys, output);
+            return;
+        }
+        if (RedisDataType.LIST.equals(dataType)) {
+            pollListToNext(keys, output);
+            return;
+        }
+        if (RedisDataType.SET.equals(dataType)) {
+            pollSetToNext(keys, output);
+            return;
+        }
+        if (RedisDataType.ZSET.equals(dataType)) {
+            pollZsetToNext(keys, output);
+            return;
+        }
+        throw new RedisConnectorException(
+                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                "UnSupport redisDataType,only support 
string,list,hash,set,zset");
+    }
+
+    private void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> zSetList = redisClient.batchGetZset(keys);
+        for (List<String> values : zSetList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    private void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<Set<String>> setList = redisClient.batchGetSet(keys);
+        for (Set<String> values : setList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    private void pollListToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<List<String>> valueList = redisClient.batchGetList(keys);
+        for (List<String> values : valueList) {
+            for (String value : values) {
+                pollValueToNext(value, output);
+            }
+        }
+    }
+
+    private void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<String> values = redisClient.batchGetString(keys);
+        for (String value : values) {
+            pollValueToNext(value, output);
+        }
+    }
+
+    private void pollValueToNext(String value, Collector<SeaTunnelRow> output) 
throws IOException {
+        if (deserializationSchema == null) {
+            output.collect(new SeaTunnelRow(new Object[] {value}));
+        } else {
+            deserializationSchema.deserialize(value.getBytes(), output);
+        }
+    }
+
+    private void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow> 
output)
+            throws IOException {
+        List<Map<String, String>> values = redisClient.batchGetHash(keys);
+        if (deserializationSchema == null) {
+            for (Map<String, String> value : values) {
+                output.collect(new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(value)}));
+            }
+            return;
+        }
+        for (Map<String, String> recordsMap : values) {
+            if (redisParameters.getHashKeyParseMode() == 
RedisConfig.HashKeyParseMode.KV) {
+                deserializationSchema.deserialize(
+                        JsonUtils.toJsonString(recordsMap).getBytes(), output);
+            } else {
+                SeaTunnelRow seaTunnelRow =
+                        new SeaTunnelRow(new Object[] 
{JsonUtils.toJsonString(recordsMap)});
+                output.collect(seaTunnelRow);
+            }
+        }
+    }
+
+    private RedisDataType resolveScanType(RedisDataType dataType) {
+        if (RedisDataType.KEY.equals(dataType)) {
+            return RedisDataType.STRING;
+        }
+        return dataType;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 7b03818c0b..0b9ed1bedb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -56,7 +56,9 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Stream;
 
@@ -223,6 +225,117 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         jedis.select(0);
     }
 
+    @TestTemplate
+    public void testScanStringTypeWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        String keyPrefix = "string_test";
+        for (int i = 0; i < 1000; i++) {
+            jedis.set(keyPrefix + i, "val");
+        }
+        Container.ExecResult execResult = 
container.executeJob("/scan-string-to-redis.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("string_test_list", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        jedis.del("string_test_list");
+        for (int i = 0; i < 1000; i++) {
+            jedis.del(keyPrefix + i);
+        }
+    }
+
+    @TestTemplate
+    public void testScanListTypeWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        String keyPrefix = "list-test-read";
+        for (int i = 0; i < 100; i++) {
+            String list = keyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.lpush(list, "val" + j);
+            }
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/scan-list-test-read-to-redis-list-test-check.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("list-test-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        jedis.del("list-test-check");
+        for (int i = 0; i < 100; i++) {
+            String delKey = keyPrefix + i;
+            jedis.del(delKey);
+        }
+    }
+
+    @TestTemplate
+    public void testScanSetTypeWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        String setKeyPrefix = "key-test-set";
+        for (int i = 0; i < 100; i++) {
+            String setKey = setKeyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.sadd(setKey, j + "");
+            }
+        }
+        Container.ExecResult execResult =
+                container.executeJob("/scan-set-to-redis-list-set-check.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("list-set-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        jedis.del("list-set-check");
+        for (int i = 0; i < 100; i++) {
+            String setKey = setKeyPrefix + i;
+            jedis.del(setKey);
+        }
+    }
+
+    @TestTemplate
+    public void testScanHashTypeWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        String hashKeyPrefix = "key-test-hash";
+        for (int i = 0; i < 100; i++) {
+            String setKey = hashKeyPrefix + i;
+            Map<String, String> map = new HashMap<>();
+            map.put("name", "fuyoujie");
+            jedis.hset(setKey, map);
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/scan-hash-to-redis-list-hash-check.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("list-hash-check", 0, -1);
+        Assertions.assertEquals(100, list.size());
+        jedis.del("list-hash-check");
+        for (int i = 0; i < 100; i++) {
+            String hashKey = hashKeyPrefix + i;
+            jedis.del(hashKey);
+        }
+        for (int i = 0; i < 100; i++) {
+            String hashKey = hashKeyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.del(hashKey);
+            }
+        }
+    }
+
+    @TestTemplate
+    public void testScanZsetTypeWriteRedis(TestContainer container)
+            throws IOException, InterruptedException {
+        String zSetKeyPrefix = "key-test-zset";
+        for (int i = 0; i < 100; i++) {
+            String key = zSetKeyPrefix + i;
+            for (int j = 0; j < 10; j++) {
+                jedis.zadd(key, 1, j + "");
+            }
+        }
+        Container.ExecResult execResult =
+                
container.executeJob("/scan-zset-to-redis-list-zset-check.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        List<String> list = jedis.lrange("list-zset-check", 0, -1);
+        Assertions.assertEquals(1000, list.size());
+        jedis.del("list-zset-check");
+        for (int i = 0; i < 100; i++) {
+            String key = zSetKeyPrefix + i;
+            jedis.del(key);
+        }
+    }
+
     @TestTemplate
     @DisabledOnContainer(
             value = {},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
index 5b46c14171..9c1dec0c36 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
@@ -36,6 +36,7 @@ source {
     keys = "key_test*"
     data_type = key
     db_num=1
+    batch_size = 33
   }
 }
 
@@ -47,5 +48,6 @@ sink {
     key = "db_test"
     data_type = list
     db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
index cd367ad872..133773d799 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
@@ -35,6 +35,7 @@ source {
     auth = "U2VhVHVubmVs"
     keys = "key_test*"
     data_type = key
+    batch_size = 33
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
index 17c5de5489..20fee261f0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
@@ -34,7 +34,8 @@ source {
     port = 6379
     auth = "U2VhVHVubmVs"
     keys = "key_test*"
-    data_type = key
+    data_type = string
+    batch_size = 33
   }
 }
 
@@ -45,5 +46,6 @@ sink {
     auth = "U2VhVHVubmVs"
     key = "key_list"
     data_type = list
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
index 5b46c14171..ee76a0a839 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
@@ -33,9 +33,9 @@ source {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    keys = "key_test*"
-    data_type = key
-    db_num=1
+    keys = "key-test-hash*"
+    data_type = hash
+    batch_size = 33
   }
 }
 
@@ -44,8 +44,8 @@ sink {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    key = "db_test"
+    key = "list-hash-check"
     data_type = list
-    db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
index 5b46c14171..62db2eaa16 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
@@ -33,9 +33,9 @@ source {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    keys = "key_test*"
-    data_type = key
-    db_num=1
+    keys = "list-test-read*"
+    data_type = list
+    batch_size = 33
   }
 }
 
@@ -44,8 +44,8 @@ sink {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    key = "db_test"
+    key = "list-test-check"
     data_type = list
-    db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
index 5b46c14171..4c64224863 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
@@ -33,9 +33,9 @@ source {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    keys = "key_test*"
-    data_type = key
-    db_num=1
+    keys = "key-test-set*"
+    data_type = set
+    batch_size = 33
   }
 }
 
@@ -44,8 +44,8 @@ sink {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    key = "db_test"
+    key = "list-set-check"
     data_type = list
-    db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
index 5b46c14171..317e603978 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
@@ -33,9 +33,9 @@ source {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    keys = "key_test*"
-    data_type = key
-    db_num=1
+    keys = "string_test*"
+    data_type = string
+    batch_size = 33
   }
 }
 
@@ -44,8 +44,8 @@ sink {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    key = "db_test"
+    key = "string_test_list"
     data_type = list
-    db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
similarity index 91%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
index 5b46c14171..f7d2c05d37 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
@@ -33,9 +33,9 @@ source {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    keys = "key_test*"
-    data_type = key
-    db_num=1
+    keys = "key-test-zset*"
+    data_type = zset
+    batch_size = 33
   }
 }
 
@@ -44,8 +44,8 @@ sink {
     host = "redis-e2e"
     port = 6379
     auth = "U2VhVHVubmVs"
-    key = "db_test"
+    key = "list-zset-check"
     data_type = list
-    db_num=2
+    batch_size = 33
   }
 }
\ No newline at end of file

Reply via email to