This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6e70cbe334 [improve][Redis]Redis scan command supports versions 5, 6, 
7 (#7666)
6e70cbe334 is described below

commit 6e70cbe3340353d33a4c191f0067cdecfeb41fe3
Author: FuYouJ <[email protected]>
AuthorDate: Sat Sep 21 22:07:49 2024 +0800

    [improve][Redis]Redis scan command supports versions 5, 6, 7 (#7666)
---
 docs/en/connector-v2/source/Redis.md               | 57 +++++++++++++++-------
 release-note.md                                    |  3 +-
 .../seatunnel/redis/client/RedisClient.java        | 35 ++++++++++++-
 .../seatunnel/redis/client/RedisClusterClient.java |  4 +-
 .../seatunnel/redis/client/RedisSingleClient.java  |  4 +-
 .../seatunnel/redis/config/RedisParameters.java    | 47 +++++++++++++++---
 .../seatunnel/redis/exception/RedisErrorCode.java  | 42 ++++++++++++++++
 .../seatunnel/redis/source/RedisSourceReader.java  |  6 +--
 .../seatunnel/e2e/connector/redis/Redis5IT.java    | 25 ++++++++++
 .../seatunnel/e2e/connector/redis/Redis7IT.java    | 25 ++++++++++
 .../e2e/connector/redis/RedisContainerInfo.java    | 47 ++++++++++++++++++
 .../{RedisIT.java => RedisTestCaseTemplateIT.java} | 47 +++++++++++-------
 12 files changed, 291 insertions(+), 51 deletions(-)

diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index 9f4e86c8fb..bd60830ba3 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,8 +17,8 @@ Used to read data from Redis.
 
 ## Options
 
-|        name         |  type  |       required        | default value |
-|---------------------|--------|-----------------------|---------------|
+| name                | type   | required              | default value |
+| ------------------- | ------ | --------------------- | ------------- |
 | host                | string | yes                   | -             |
 | port                | int    | yes                   | -             |
 | keys                | string | yes                   | -             |
@@ -67,7 +67,6 @@ for example, if the value of hash key is the following shown:
 if hash_key_parse_mode is `all` and schema config as the following shown, it 
will generate the following data:
 
 ```hocon
-
 schema {
   fields {
     001 {
@@ -83,14 +82,13 @@ schema {
 
 ```
 
-|               001               |            002            |
-|---------------------------------|---------------------------|
+| 001                             | 002                       |
+| ------------------------------- | ------------------------- |
 | Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
 
 if hash_key_parse_mode is `kv` and schema config as the following shown, it 
will generate the following data:
 
 ```hocon
-
 schema {
   fields {
     hash_key = string
@@ -101,10 +99,10 @@ schema {
 
 ```
 
-| hash_key |     name      | age |
-|----------|---------------|-----|
-| 001      | tyrantlucifer | 26  |
-| 002      | Zongwen       | 26  |
+| hash_key | name          | age  |
+| -------- | ------------- | ---- |
+| 001      | tyrantlucifer | 26   |
+| 002      | Zongwen       | 26   |
 
 each kv that in hash key it will be treated as a row and send it to upstream.
 
@@ -180,7 +178,6 @@ when you assign format is `json`, you should also assign 
schema option, for exam
 upstream data is the following:
 
 ```json
-
 {"code":  200, "data":  "get success", "success":  true}
 
 ```
@@ -188,7 +185,6 @@ upstream data is the following:
 you should assign schema as the following:
 
 ```hocon
-
 schema {
     fields {
         code = int
@@ -201,8 +197,8 @@ schema {
 
 connector will generate data as the following:
 
-| code |    data     | success |
-|------|-------------|---------|
+| code | data        | success |
+| ---- | ----------- | ------- |
 | 200  | get success | true    |
 
 when you assign format is `text`, connector will do nothing for upstream data, 
for example:
@@ -210,15 +206,14 @@ when you assign format is `text`, connector will do 
nothing for upstream data, f
 upstream data is the following:
 
 ```json
-
 {"code":  200, "data":  "get success", "success":  true}
 
 ```
 
 connector will generate data as the following:
 
-|                         content                          |
-|----------------------------------------------------------|
+| content                                                  |
+| -------------------------------------------------------- |
 | {"code":  200, "data":  "get success", "success":  true} |
 
 ### schema [config]
@@ -261,6 +256,32 @@ Redis {
 }
 ```
 
+read string type keys write append to list
+
+```hocon
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "string_test*"
+    data_type = string
+    batch_size = 33
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "string_test_list"
+    data_type = list
+    batch_size = 33
+  }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
@@ -270,4 +291,4 @@ Redis {
 ### next version
 
 - [Improve] Support redis cluster mode connection and user authentication 
[3188](https://github.com/apache/seatunnel/pull/3188)
-
+-  [Bug] Redis scan command supports versions 5, 6, 7 
[7666](https://github.com/apache/seatunnel/pull/7666)
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index 4ed0d51fed..6147093eee 100644
--- a/release-note.md
+++ b/release-note.md
@@ -58,7 +58,7 @@
 - [Connector-v2] [Mongodb] Support to convert to double from numeric type that 
mongodb saved it as numeric internally (#6997)
 - [Connector-v2] [Redis] Using scan replace keys operation command,support 
batchWrite in single mode(#7030,#7085)
 - [Connector-V2] [Clickhouse] Add a new optional configuration 
`clickhouse.config` to the source connector of ClickHouse (#7143)
-- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
+- [Connector-V2] [Redis] Redis scan command supports versions 3, 4, 5, 6, 7 
(#7666)
 
 ### Zeta(ST-Engine)
 
@@ -200,6 +200,7 @@
 - [Connector-V2] [Assert] Support field type assert and field value equality 
assert for full data types (#6275)
 - [Connector-V2] [Iceberg] Support iceberg sink #6198
 - [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
+- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index 5730838cca..109c51f78a 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -24,6 +24,7 @@ import redis.clients.jedis.Jedis;
 import redis.clients.jedis.params.ScanParams;
 import redis.clients.jedis.resps.ScanResult;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,14 +33,19 @@ public abstract class RedisClient extends Jedis {
 
     protected final RedisParameters redisParameters;
 
+    private final Integer redisVersion;
+
     protected final int batchSize;
 
     protected final Jedis jedis;
 
-    protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
+    private static final int REDIS_5 = 5;
+
+    protected RedisClient(RedisParameters redisParameters, Jedis jedis, int 
redisVersion) {
         this.redisParameters = redisParameters;
         this.batchSize = redisParameters.getBatchSize();
         this.jedis = jedis;
+        this.redisVersion = redisVersion;
     }
 
     public ScanResult<String> scanKeys(
@@ -47,7 +53,32 @@ public abstract class RedisClient extends Jedis {
         ScanParams scanParams = new ScanParams();
         scanParams.match(keysPattern);
         scanParams.count(batchSize);
-        return jedis.scan(cursor, scanParams, type.name());
+        return scanByRedisVersion(cursor, scanParams, type, redisVersion);
+    }
+
+    private ScanResult<String> scanByRedisVersion(
+            String cursor, ScanParams scanParams, RedisDataType type, Integer 
redisVersion) {
+        if (redisVersion <= REDIS_5) {
+            return scanOnRedis5(cursor, scanParams, type);
+        } else {
+            return jedis.scan(cursor, scanParams, type.name());
+        }
+    }
+
+    // When the version is earlier than redis5, scan command does not support 
type
+    private ScanResult<String> scanOnRedis5(
+            String cursor, ScanParams scanParams, RedisDataType type) {
+        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+        String resultCursor = scanResult.getCursor();
+        List<String> keys = scanResult.getResult();
+        List<String> typeKeys = new ArrayList<>(keys.size());
+        for (String key : keys) {
+            String keyType = jedis.type(key);
+            if (type.name().equalsIgnoreCase(keyType)) {
+                typeKeys.add(key);
+            }
+        }
+        return new ScanResult<>(resultCursor, typeKeys);
     }
 
     public abstract List<String> batchGetString(List<String> keys);
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index 13acc89def..bd687e6c9b 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import java.util.Set;
 
 public class RedisClusterClient extends RedisClient {
-    public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
-        super(redisParameters, jedis);
+    public RedisClusterClient(RedisParameters redisParameters, Jedis jedis, 
int redisVersion) {
+        super(redisParameters, jedis, redisVersion);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index 99bae5e733..f79aa46e98 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -34,8 +34,8 @@ import java.util.Set;
 // In standalone mode, pipeline can be used to improve batch read performance
 public class RedisSingleClient extends RedisClient {
 
-    public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) {
-        super(redisParameters, jedis);
+    public RedisSingleClient(RedisParameters redisParameters, Jedis jedis, int 
redisVersion) {
+        super(redisParameters, jedis, redisVersion);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index ef1fe104d7..3d7e954f1d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.redis.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorE
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 import redis.clients.jedis.ConnectionPoolConfig;
 import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.Jedis;
@@ -37,7 +38,11 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_VERSION_INFO_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.INVALID_CONFIG;
+
 @Data
+@Slf4j
 public class RedisParameters implements Serializable {
     private String host;
     private int port;
@@ -53,6 +58,8 @@ public class RedisParameters implements Serializable {
     private long expire = RedisConfig.EXPIRE.defaultValue();
     private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
 
+    private int redisVersion;
+
     public void buildWithConfig(ReadonlyConfig config) {
         // set host
         this.host = config.get(RedisConfig.HOST);
@@ -94,11 +101,40 @@ public class RedisParameters implements Serializable {
 
     public RedisClient buildRedisClient() {
         Jedis jedis = this.buildJedis();
+        this.redisVersion = extractRedisVersion(jedis);
         if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
-            return new RedisSingleClient(this, jedis);
+            return new RedisSingleClient(this, jedis, redisVersion);
         } else {
-            return new RedisClusterClient(this, jedis);
+            return new RedisClusterClient(this, jedis, redisVersion);
+        }
+    }
+
+    private int extractRedisVersion(Jedis jedis) {
+        log.info("Try to get redis version information from the jedis.info() 
method");
+        // # Server
+        // redis_version:5.0.14
+        // redis_git_sha1:00000000
+        // redis_git_dirty:0
+        String info = jedis.info();
+        try {
+            for (String line : info.split("\n")) {
+                if (line.startsWith("redis_version:")) {
+                    // 5.0.14
+                    String versionInfo = line.split(":")[1].trim();
+                    log.info("The version of Redis is :{}", versionInfo);
+                    String[] parts = versionInfo.split("\\.");
+                    return Integer.parseInt(parts[0]);
+                }
+            }
+        } catch (Exception e) {
+            throw new RedisConnectorException(
+                    GET_REDIS_VERSION_INFO_FAILED,
+                    GET_REDIS_VERSION_INFO_FAILED.getErrorMessage(),
+                    e);
         }
+        throw new RedisConnectorException(
+                GET_REDIS_VERSION_INFO_FAILED,
+                "Did not get the expected redis_version from the jedis.info() 
method");
     }
 
     public Jedis buildJedis() {
@@ -122,7 +158,7 @@ public class RedisParameters implements Serializable {
                         String[] splits = redisNode.split(":");
                         if (splits.length != 2) {
                             throw new RedisConnectorException(
-                                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                                    INVALID_CONFIG,
                                     "Invalid redis node information,"
                                             + "redis node information must 
like as the following: [host:port]");
                         }
@@ -151,8 +187,7 @@ public class RedisParameters implements Serializable {
             default:
                 // do nothing
                 throw new RedisConnectorException(
-                        CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
-                        "Not support this redis mode");
+                        CommonErrorCode.OPERATION_NOT_SUPPORTED, "Not support 
this redis mode");
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
new file mode 100644
index 0000000000..4d9bb745bd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -0,0 +1,42 @@
+package org.apache.seatunnel.connectors.seatunnel.redis.exception;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RedisErrorCode implements SeaTunnelErrorCode {
+    GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the 
redis version"),
+    INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");
+
+    private final String code;
+    private final String description;
+
+    RedisErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index 10825be8bc..be67c266f9 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -80,9 +80,6 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
                     redisClient.scanKeys(cursor, batchSize, keysPattern, 
redisDataType);
             cursor = scanResult.getCursor();
             List<String> keys = scanResult.getResult();
-            if (CollectionUtils.isEmpty(keys)) {
-                break;
-            }
             pollNext(keys, redisDataType, output);
             // when cursor return "0", scan end
             if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
@@ -94,6 +91,9 @@ public class RedisSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
 
     private void pollNext(List<String> keys, RedisDataType dataType, 
Collector<SeaTunnelRow> output)
             throws IOException {
+        if (CollectionUtils.isEmpty(keys)) {
+            return;
+        }
         if (RedisDataType.HASH.equals(dataType)) {
             pollHashMapToNext(keys, output);
             return;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
new file mode 100644
index 0000000000..6df6e80d6e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class Redis5IT extends RedisTestCaseTemplateIT {
+
+    @Override
+    public RedisContainerInfo getRedisContainerInfo() {
+        return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel", 
"redis:5");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
new file mode 100644
index 0000000000..dfa46e886a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class Redis7IT extends RedisTestCaseTemplateIT {
+
+    @Override
+    public RedisContainerInfo getRedisContainerInfo() {
+        return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel", 
"redis:7");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
new file mode 100644
index 0000000000..61b55a6594
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class RedisContainerInfo {
+    private final String host;
+    private final int port;
+    private final String password;
+    private final String imageName;
+
+    public RedisContainerInfo(String host, int port, String password, String 
imageName) {
+        this.host = host;
+        this.port = port;
+        this.password = password;
+        this.imageName = imageName;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getImageName() {
+        return imageName;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
similarity index 91%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 280ca94faf..66288bbb15 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.seatunnel.e2e.connector.redis;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
@@ -63,14 +62,15 @@ import java.util.Objects;
 import java.util.stream.Stream;
 
 @Slf4j
-public class RedisIT extends TestSuiteBase implements TestResource {
-    private static final String IMAGE = "redis:latest";
-    private static final String HOST = "redis-e2e";
-    private static final int PORT = 6379;
-    private static final String PASSWORD = "SeaTunnel";
+public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements 
TestResource {
+
+    private String host;
+    private int port;
+    private String password;
+
+    private String imageName;
 
-    private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>> 
TEST_DATASET =
-            generateTestDataSet();
+    private Pair<SeaTunnelRowType, List<SeaTunnelRow>> testDateSet;
 
     private GenericContainer<?> redisContainer;
 
@@ -79,13 +79,15 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
     @BeforeAll
     @Override
     public void startUp() {
+        initContainerInfo();
         this.redisContainer =
-                new GenericContainer<>(DockerImageName.parse(IMAGE))
+                new GenericContainer<>(DockerImageName.parse(imageName))
                         .withNetwork(NETWORK)
-                        .withNetworkAliases(HOST)
-                        .withExposedPorts(PORT)
-                        .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
-                        .withCommand(String.format("redis-server --requirepass 
%s", PASSWORD))
+                        .withNetworkAliases(host)
+                        .withExposedPorts(port)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName)))
+                        .withCommand(String.format("redis-server --requirepass 
%s", password))
                         .waitingFor(
                                 new HostPortWaitStrategy()
                                         
.withStartupTimeout(Duration.ofMinutes(2)));
@@ -95,10 +97,19 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         this.initSourceData();
     }
 
+    private void initContainerInfo() {
+        RedisContainerInfo redisContainerInfo = getRedisContainerInfo();
+        this.host = redisContainerInfo.getHost();
+        this.port = redisContainerInfo.getPort();
+        this.password = redisContainerInfo.getPassword();
+        this.imageName = redisContainerInfo.getImageName();
+        this.testDateSet = generateTestDataSet();
+    }
+
     private void initSourceData() {
         JsonSerializationSchema jsonSerializationSchema =
-                new JsonSerializationSchema(TEST_DATASET.getKey());
-        List<SeaTunnelRow> rows = TEST_DATASET.getValue();
+                new JsonSerializationSchema(testDateSet.getKey());
+        List<SeaTunnelRow> rows = testDateSet.getValue();
         for (int i = 0; i < rows.size(); i++) {
             jedis.set("key_test" + i, new 
String(jsonSerializationSchema.serialize(rows.get(i))));
         }
@@ -111,7 +122,7 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         jedis.select(0);
     }
 
-    private static Pair<SeaTunnelRowType, List<SeaTunnelRow>> 
generateTestDataSet() {
+    protected Pair<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet() 
{
         SeaTunnelRowType rowType =
                 new SeaTunnelRowType(
                         new String[] {
@@ -177,7 +188,7 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
 
     private void initJedis() {
         Jedis jedis = new Jedis(redisContainer.getHost(), 
redisContainer.getFirstMappedPort());
-        jedis.auth(PASSWORD);
+        jedis.auth(password);
         jedis.ping();
         this.jedis = jedis;
     }
@@ -351,4 +362,6 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         jedis.del("key_multi_list");
         jedis.select(0);
     }
+
+    public abstract RedisContainerInfo getRedisContainerInfo();
 }

Reply via email to