This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6e70cbe334 [improve][Redis]Redis scan command supports versions 5, 6,
7 (#7666)
6e70cbe334 is described below
commit 6e70cbe3340353d33a4c191f0067cdecfeb41fe3
Author: FuYouJ <[email protected]>
AuthorDate: Sat Sep 21 22:07:49 2024 +0800
[improve][Redis]Redis scan command supports versions 5, 6, 7 (#7666)
---
docs/en/connector-v2/source/Redis.md | 57 +++++++++++++++-------
release-note.md | 3 +-
.../seatunnel/redis/client/RedisClient.java | 35 ++++++++++++-
.../seatunnel/redis/client/RedisClusterClient.java | 4 +-
.../seatunnel/redis/client/RedisSingleClient.java | 4 +-
.../seatunnel/redis/config/RedisParameters.java | 47 +++++++++++++++---
.../seatunnel/redis/exception/RedisErrorCode.java | 42 ++++++++++++++++
.../seatunnel/redis/source/RedisSourceReader.java | 6 +--
.../seatunnel/e2e/connector/redis/Redis5IT.java | 25 ++++++++++
.../seatunnel/e2e/connector/redis/Redis7IT.java | 25 ++++++++++
.../e2e/connector/redis/RedisContainerInfo.java | 47 ++++++++++++++++++
.../{RedisIT.java => RedisTestCaseTemplateIT.java} | 47 +++++++++++-------
12 files changed, 291 insertions(+), 51 deletions(-)
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 9f4e86c8fb..bd60830ba3 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -17,8 +17,8 @@ Used to read data from Redis.
## Options
-| name | type | required | default value |
-|---------------------|--------|-----------------------|---------------|
+| name | type | required | default value |
+| ------------------- | ------ | --------------------- | ------------- |
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
@@ -67,7 +67,6 @@ for example, if the value of hash key is the following shown:
if hash_key_parse_mode is `all` and schema config as the following shown, it
will generate the following data:
```hocon
-
schema {
fields {
001 {
@@ -83,14 +82,13 @@ schema {
```
-| 001 | 002 |
-|---------------------------------|---------------------------|
+| 001 | 002 |
+| ------------------------------- | ------------------------- |
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |
if hash_key_parse_mode is `kv` and schema config as the following shown, it
will generate the following data:
```hocon
-
schema {
fields {
hash_key = string
@@ -101,10 +99,10 @@ schema {
```
-| hash_key | name | age |
-|----------|---------------|-----|
-| 001 | tyrantlucifer | 26 |
-| 002 | Zongwen | 26 |
+| hash_key | name | age |
+| -------- | ------------- | ---- |
+| 001 | tyrantlucifer | 26 |
+| 002 | Zongwen | 26 |
each kv that in hash key it will be treated as a row and send it to upstream.
@@ -180,7 +178,6 @@ when you assign format is `json`, you should also assign
schema option, for exam
upstream data is the following:
```json
-
{"code": 200, "data": "get success", "success": true}
```
@@ -188,7 +185,6 @@ upstream data is the following:
you should assign schema as the following:
```hocon
-
schema {
fields {
code = int
@@ -201,8 +197,8 @@ schema {
connector will generate data as the following:
-| code | data | success |
-|------|-------------|---------|
+| code | data | success |
+| ---- | ----------- | ------- |
| 200 | get success | true |
when you assign format is `text`, connector will do nothing for upstream data,
for example:
@@ -210,15 +206,14 @@ when you assign format is `text`, connector will do
nothing for upstream data, f
upstream data is the following:
```json
-
{"code": 200, "data": "get success", "success": true}
```
connector will generate data as the following:
-| content |
-|----------------------------------------------------------|
+| content |
+| -------------------------------------------------------- |
| {"code": 200, "data": "get success", "success": true} |
### schema [config]
@@ -261,6 +256,32 @@ Redis {
}
```
+read string type keys write append to list
+
+```hocon
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "string_test*"
+ data_type = string
+ batch_size = 33
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "string_test_list"
+ data_type = list
+ batch_size = 33
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
@@ -270,4 +291,4 @@ Redis {
### next version
- [Improve] Support redis cluster mode connection and user authentication
[3188](https://github.com/apache/seatunnel/pull/3188)
-
+- [Bug] Redis scan command supports versions 5, 6, 7
[7666](https://github.com/apache/seatunnel/pull/7666)
\ No newline at end of file
diff --git a/release-note.md b/release-note.md
index 4ed0d51fed..6147093eee 100644
--- a/release-note.md
+++ b/release-note.md
@@ -58,7 +58,7 @@
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that
mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support
batchWrite in single mode(#7030,#7085)
- [Connector-V2] [Clickhouse] Add a new optional configuration
`clickhouse.config` to the source connector of ClickHouse (#7143)
-- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
+- [Connector-V2] [Redis] Redis scan command supports versions 3, 4, 5, 6, 7
(#7666)
### Zeta(ST-Engine)
@@ -200,6 +200,7 @@
- [Connector-V2] [Assert] Support field type assert and field value equality
assert for full data types (#6275)
- [Connector-V2] [Iceberg] Support iceberg sink #6198
- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
+- [Connector-V2] [ElasticsSource] Source support multiSource (#6730)
### Zeta(ST-Engine)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index 5730838cca..109c51f78a 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -24,6 +24,7 @@ import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,14 +33,19 @@ public abstract class RedisClient extends Jedis {
protected final RedisParameters redisParameters;
+ private final Integer redisVersion;
+
protected final int batchSize;
protected final Jedis jedis;
- protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
+ private static final int REDIS_5 = 5;
+
+ protected RedisClient(RedisParameters redisParameters, Jedis jedis, int
redisVersion) {
this.redisParameters = redisParameters;
this.batchSize = redisParameters.getBatchSize();
this.jedis = jedis;
+ this.redisVersion = redisVersion;
}
public ScanResult<String> scanKeys(
@@ -47,7 +53,32 @@ public abstract class RedisClient extends Jedis {
ScanParams scanParams = new ScanParams();
scanParams.match(keysPattern);
scanParams.count(batchSize);
- return jedis.scan(cursor, scanParams, type.name());
+ return scanByRedisVersion(cursor, scanParams, type, redisVersion);
+ }
+
+ private ScanResult<String> scanByRedisVersion(
+ String cursor, ScanParams scanParams, RedisDataType type, Integer
redisVersion) {
+ if (redisVersion <= REDIS_5) {
+ return scanOnRedis5(cursor, scanParams, type);
+ } else {
+ return jedis.scan(cursor, scanParams, type.name());
+ }
+ }
+
+ // When the version is earlier than redis5, scan command does not support
type
+ private ScanResult<String> scanOnRedis5(
+ String cursor, ScanParams scanParams, RedisDataType type) {
+ ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+ String resultCursor = scanResult.getCursor();
+ List<String> keys = scanResult.getResult();
+ List<String> typeKeys = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ String keyType = jedis.type(key);
+ if (type.name().equalsIgnoreCase(keyType)) {
+ typeKeys.add(key);
+ }
+ }
+ return new ScanResult<>(resultCursor, typeKeys);
}
public abstract List<String> batchGetString(List<String> keys);
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index 13acc89def..bd687e6c9b 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -30,8 +30,8 @@ import java.util.Map;
import java.util.Set;
public class RedisClusterClient extends RedisClient {
- public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
- super(redisParameters, jedis);
+ public RedisClusterClient(RedisParameters redisParameters, Jedis jedis,
int redisVersion) {
+ super(redisParameters, jedis, redisVersion);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index 99bae5e733..f79aa46e98 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -34,8 +34,8 @@ import java.util.Set;
// In standalone mode, pipeline can be used to improve batch read performance
public class RedisSingleClient extends RedisClient {
- public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) {
- super(redisParameters, jedis);
+ public RedisSingleClient(RedisParameters redisParameters, Jedis jedis, int
redisVersion) {
+ super(redisParameters, jedis, redisVersion);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index ef1fe104d7..3d7e954f1d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient;
import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorE
import org.apache.commons.lang3.StringUtils;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
@@ -37,7 +38,11 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_VERSION_INFO_FAILED;
+import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.INVALID_CONFIG;
+
@Data
+@Slf4j
public class RedisParameters implements Serializable {
private String host;
private int port;
@@ -53,6 +58,8 @@ public class RedisParameters implements Serializable {
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
+ private int redisVersion;
+
public void buildWithConfig(ReadonlyConfig config) {
// set host
this.host = config.get(RedisConfig.HOST);
@@ -94,11 +101,40 @@ public class RedisParameters implements Serializable {
public RedisClient buildRedisClient() {
Jedis jedis = this.buildJedis();
+ this.redisVersion = extractRedisVersion(jedis);
if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
- return new RedisSingleClient(this, jedis);
+ return new RedisSingleClient(this, jedis, redisVersion);
} else {
- return new RedisClusterClient(this, jedis);
+ return new RedisClusterClient(this, jedis, redisVersion);
+ }
+ }
+
+ private int extractRedisVersion(Jedis jedis) {
+ log.info("Try to get redis version information from the jedis.info()
method");
+ // # Server
+ // redis_version:5.0.14
+ // redis_git_sha1:00000000
+ // redis_git_dirty:0
+ String info = jedis.info();
+ try {
+ for (String line : info.split("\n")) {
+ if (line.startsWith("redis_version:")) {
+ // 5.0.14
+ String versionInfo = line.split(":")[1].trim();
+ log.info("The version of Redis is :{}", versionInfo);
+ String[] parts = versionInfo.split("\\.");
+ return Integer.parseInt(parts[0]);
+ }
+ }
+ } catch (Exception e) {
+ throw new RedisConnectorException(
+ GET_REDIS_VERSION_INFO_FAILED,
+ GET_REDIS_VERSION_INFO_FAILED.getErrorMessage(),
+ e);
}
+ throw new RedisConnectorException(
+ GET_REDIS_VERSION_INFO_FAILED,
+ "Did not get the expected redis_version from the jedis.info()
method");
}
public Jedis buildJedis() {
@@ -122,7 +158,7 @@ public class RedisParameters implements Serializable {
String[] splits = redisNode.split(":");
if (splits.length != 2) {
throw new RedisConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ INVALID_CONFIG,
"Invalid redis node information,"
+ "redis node information must
like as the following: [host:port]");
}
@@ -151,8 +187,7 @@ public class RedisParameters implements Serializable {
default:
// do nothing
throw new RedisConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "Not support this redis mode");
+ CommonErrorCode.OPERATION_NOT_SUPPORTED, "Not support
this redis mode");
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
new file mode 100644
index 0000000000..4d9bb745bd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -0,0 +1,42 @@
+package org.apache.seatunnel.connectors.seatunnel.redis.exception;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RedisErrorCode implements SeaTunnelErrorCode {
+ GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the
redis version"),
+ INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");
+
+ private final String code;
+ private final String description;
+
+ RedisErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index 10825be8bc..be67c266f9 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -80,9 +80,6 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
redisClient.scanKeys(cursor, batchSize, keysPattern,
redisDataType);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
- if (CollectionUtils.isEmpty(keys)) {
- break;
- }
pollNext(keys, redisDataType, output);
// when cursor return "0", scan end
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
@@ -94,6 +91,9 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
private void pollNext(List<String> keys, RedisDataType dataType,
Collector<SeaTunnelRow> output)
throws IOException {
+ if (CollectionUtils.isEmpty(keys)) {
+ return;
+ }
if (RedisDataType.HASH.equals(dataType)) {
pollHashMapToNext(keys, output);
return;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
new file mode 100644
index 0000000000..6df6e80d6e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class Redis5IT extends RedisTestCaseTemplateIT {
+
+ @Override
+ public RedisContainerInfo getRedisContainerInfo() {
+ return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel",
"redis:5");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
new file mode 100644
index 0000000000..dfa46e886a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class Redis7IT extends RedisTestCaseTemplateIT {
+
+ @Override
+ public RedisContainerInfo getRedisContainerInfo() {
+ return new RedisContainerInfo("redis-e2e", 6379, "SeaTunnel",
"redis:7");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
new file mode 100644
index 0000000000..61b55a6594
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+public class RedisContainerInfo {
+ private final String host;
+ private final int port;
+ private final String password;
+ private final String imageName;
+
+ public RedisContainerInfo(String host, int port, String password, String
imageName) {
+ this.host = host;
+ this.port = port;
+ this.password = password;
+ this.imageName = imageName;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getImageName() {
+ return imageName;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
similarity index 91%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 280ca94faf..66288bbb15 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.seatunnel.e2e.connector.redis;
import org.apache.seatunnel.api.table.type.ArrayType;
@@ -63,14 +62,15 @@ import java.util.Objects;
import java.util.stream.Stream;
@Slf4j
-public class RedisIT extends TestSuiteBase implements TestResource {
- private static final String IMAGE = "redis:latest";
- private static final String HOST = "redis-e2e";
- private static final int PORT = 6379;
- private static final String PASSWORD = "SeaTunnel";
+public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements
TestResource {
+
+ private String host;
+ private int port;
+ private String password;
+
+ private String imageName;
- private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>>
TEST_DATASET =
- generateTestDataSet();
+ private Pair<SeaTunnelRowType, List<SeaTunnelRow>> testDateSet;
private GenericContainer<?> redisContainer;
@@ -79,13 +79,15 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
@BeforeAll
@Override
public void startUp() {
+ initContainerInfo();
this.redisContainer =
- new GenericContainer<>(DockerImageName.parse(IMAGE))
+ new GenericContainer<>(DockerImageName.parse(imageName))
.withNetwork(NETWORK)
- .withNetworkAliases(HOST)
- .withExposedPorts(PORT)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
- .withCommand(String.format("redis-server --requirepass
%s", PASSWORD))
+ .withNetworkAliases(host)
+ .withExposedPorts(port)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName)))
+ .withCommand(String.format("redis-server --requirepass
%s", password))
.waitingFor(
new HostPortWaitStrategy()
.withStartupTimeout(Duration.ofMinutes(2)));
@@ -95,10 +97,19 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
this.initSourceData();
}
+ private void initContainerInfo() {
+ RedisContainerInfo redisContainerInfo = getRedisContainerInfo();
+ this.host = redisContainerInfo.getHost();
+ this.port = redisContainerInfo.getPort();
+ this.password = redisContainerInfo.getPassword();
+ this.imageName = redisContainerInfo.getImageName();
+ this.testDateSet = generateTestDataSet();
+ }
+
private void initSourceData() {
JsonSerializationSchema jsonSerializationSchema =
- new JsonSerializationSchema(TEST_DATASET.getKey());
- List<SeaTunnelRow> rows = TEST_DATASET.getValue();
+ new JsonSerializationSchema(testDateSet.getKey());
+ List<SeaTunnelRow> rows = testDateSet.getValue();
for (int i = 0; i < rows.size(); i++) {
jedis.set("key_test" + i, new
String(jsonSerializationSchema.serialize(rows.get(i))));
}
@@ -111,7 +122,7 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
jedis.select(0);
}
- private static Pair<SeaTunnelRowType, List<SeaTunnelRow>>
generateTestDataSet() {
+ protected Pair<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet()
{
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {
@@ -177,7 +188,7 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
private void initJedis() {
Jedis jedis = new Jedis(redisContainer.getHost(),
redisContainer.getFirstMappedPort());
- jedis.auth(PASSWORD);
+ jedis.auth(password);
jedis.ping();
this.jedis = jedis;
}
@@ -351,4 +362,6 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
jedis.del("key_multi_list");
jedis.select(0);
}
+
+ public abstract RedisContainerInfo getRedisContainerInfo();
}