This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 77fbbbd0ee [Improve][Connector-v2][Redis] Redis support select db
(#5570)
77fbbbd0ee is described below
commit 77fbbbd0ee742c1c23b7a10a1b0ed3af3b457b4a
Author: xiaofan2012 <[email protected]>
AuthorDate: Tue Oct 17 15:10:41 2023 +0800
[Improve][Connector-v2][Redis] Redis support select db (#5570)
* redis select db #5415
* redis select db #5415
* add test
* redis db_num backup
* fix test fail
* update
---------
Co-authored-by: xiaofan2022 <[email protected]>
---
docs/en/connector-v2/sink/Redis.md | 5 +++
docs/en/connector-v2/source/Redis.md | 5 +++
.../seatunnel/redis/config/RedisConfig.java | 7 +++
.../seatunnel/redis/config/RedisParameters.java | 10 ++++-
.../seatunnel/e2e/connector/redis/RedisIT.java | 17 ++++++++
.../test/resources/redis-to-redis-by-db-num.conf | 51 ++++++++++++++++++++++
6 files changed, 94 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index 7d2ef237e1..f91e6bc6ec 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -20,6 +20,7 @@ Used to write data to Redis.
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
+| db_num | int | no | 0 |
| mode | string | no | single |
| nodes | list | yes when mode=cluster | - |
| format | string | no | json |
@@ -91,6 +92,10 @@ redis authentication user, you need it when you connect to
an encrypted cluster
Redis authentication password, you need it when you connect to an encrypted
cluster
+### db_num [int]
+
+Redis database index ID. It is connected to db 0 by default
+
### mode [string]
redis mode, `single` or `cluster`, default is `single`
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index fa4996b0e3..3029f8061d 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -25,6 +25,7 @@ Used to read data from Redis.
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
+| db_num | int | no | 0 |
| mode | string | no | single |
| hash_key_parse_mode | string | no | all |
| nodes | list | yes when mode=cluster | - |
@@ -151,6 +152,10 @@ redis authentication user, you need it when you connect to
an encrypted cluster
redis authentication password, you need it when you connect to an encrypted
cluster
+### db_num [int]
+
+Redis database index ID. It is connected to db 0 by default
+
### mode [string]
redis mode, `single` or `cluster`, default is `single`
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 511cbe4aa9..7b0c20cbea 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -48,6 +48,13 @@ public class RedisConfig {
.withDescription(
"redis authentication password, you need it when
you connect to an encrypted cluster");
+ public static final Option<Integer> DB_NUM =
+ Options.key("db_num")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Redis database index id, it is connected to db 0
by default");
+
public static final Option<String> USER =
Options.key("user")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 8954b4da2a..b1922263cf 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -40,6 +40,7 @@ public class RedisParameters implements Serializable {
private String host;
private int port;
private String auth = "";
+ private int dbNum;
private String user = "";
private String keysPattern;
private String keyField;
@@ -58,6 +59,10 @@ public class RedisParameters implements Serializable {
if (config.hasPath(RedisConfig.AUTH.key())) {
this.auth = config.getString(RedisConfig.AUTH.key());
}
+ // set db_num
+ if (config.hasPath(RedisConfig.DB_NUM.key())) {
+ this.dbNum = config.getInt(RedisConfig.DB_NUM.key());
+ }
// set user
if (config.hasPath(RedisConfig.USER.key())) {
this.user = config.getString(RedisConfig.USER.key());
@@ -115,6 +120,7 @@ public class RedisParameters implements Serializable {
if (StringUtils.isNotBlank(user)) {
jedis.aclSetUser(user);
}
+ jedis.select(dbNum);
return jedis;
case CLUSTER:
HashSet<HostAndPort> nodes = new HashSet<>();
@@ -148,7 +154,9 @@ public class RedisParameters implements Serializable {
} else {
jedisCluster = new JedisCluster(nodes);
}
- return new JedisWrapper(jedisCluster);
+ JedisWrapper jedisWrapper = new JedisWrapper(jedisCluster);
+ jedisWrapper.select(dbNum);
+ return jedisWrapper;
default:
// do nothing
throw new RedisConnectorException(
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index bd4a9063ba..2a2feb7744 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -98,6 +98,13 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
for (int i = 0; i < rows.size(); i++) {
jedis.set("key_test" + i, new
String(jsonSerializationSchema.serialize(rows.get(i))));
}
+ // db_1 init data
+ jedis.select(1);
+ for (int i = 0; i < rows.size(); i++) {
+ jedis.set("key_test" + i, new
String(jsonSerializationSchema.serialize(rows.get(i))));
+ }
+ // db_num backup
+ jedis.select(0);
}
private static Pair<SeaTunnelRowType, List<SeaTunnelRow>>
generateTestDataSet() {
@@ -203,4 +210,14 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
Thread.sleep(60 * 1000);
Assertions.assertEquals(0, jedis.llen("key_list"));
}
+
+ @TestTemplate
+ public void restRedisDbNum(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/redis-to-redis-by-db-num.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ jedis.select(2);
+ Assertions.assertEquals(100, jedis.llen("db_test"));
+ jedis.del("db_test");
+ jedis.select(0);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
new file mode 100644
index 0000000000..a14bc2ab9f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = key
+ db_num=1
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "db_test"
+ data_type = list
+ db_num=2
+ }
+}
\ No newline at end of file