This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 77fbbbd0ee [Improve][Connector-v2][Redis] Redis support select db 
(#5570)
77fbbbd0ee is described below

commit 77fbbbd0ee742c1c23b7a10a1b0ed3af3b457b4a
Author: xiaofan2012 <[email protected]>
AuthorDate: Tue Oct 17 15:10:41 2023 +0800

    [Improve][Connector-v2][Redis] Redis support select db (#5570)
    
    * redis select db #5415
    
    * redis select db #5415
    
    * add test
    
    * redis db_num backup
    
    * fix test fail
    
    * update
    
    ---------
    
    Co-authored-by: xiaofan2022 <[email protected]>
---
 docs/en/connector-v2/sink/Redis.md                 |  5 +++
 docs/en/connector-v2/source/Redis.md               |  5 +++
 .../seatunnel/redis/config/RedisConfig.java        |  7 +++
 .../seatunnel/redis/config/RedisParameters.java    | 10 ++++-
 .../seatunnel/e2e/connector/redis/RedisIT.java     | 17 ++++++++
 .../test/resources/redis-to-redis-by-db-num.conf   | 51 ++++++++++++++++++++++
 6 files changed, 94 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index 7d2ef237e1..f91e6bc6ec 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -20,6 +20,7 @@ Used to write data to Redis.
 | data_type      | string | yes                   | -             |
 | user           | string | no                    | -             |
 | auth           | string | no                    | -             |
+| db_num         | int    | no                    | 0             |
 | mode           | string | no                    | single        |
 | nodes          | list   | yes when mode=cluster | -             |
 | format         | string | no                    | json          |
@@ -91,6 +92,10 @@ redis authentication user, you need it when you connect to 
an encrypted cluster
 
 Redis authentication password, you need it when you connect to an encrypted 
cluster
 
+### db_num [int]
+
+Redis database index ID. It is connected to db 0 by default
+
 ### mode [string]
 
 redis mode, `single` or `cluster`, default is `single`
diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index fa4996b0e3..3029f8061d 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -25,6 +25,7 @@ Used to read data from Redis.
 | data_type           | string | yes                   | -             |
 | user                | string | no                    | -             |
 | auth                | string | no                    | -             |
+| db_num              | int    | no                    | 0             |
 | mode                | string | no                    | single        |
 | hash_key_parse_mode | string | no                    | all           |
 | nodes               | list   | yes when mode=cluster | -             |
@@ -151,6 +152,10 @@ redis authentication user, you need it when you connect to 
an encrypted cluster
 
 redis authentication password, you need it when you connect to an encrypted 
cluster
 
+### db_num [int]
+
+Redis database index ID. It is connected to db 0 by default
+
 ### mode [string]
 
 redis mode, `single` or `cluster`, default is `single`
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index 511cbe4aa9..7b0c20cbea 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -48,6 +48,13 @@ public class RedisConfig {
                     .withDescription(
                             "redis authentication password, you need it when 
you connect to an encrypted cluster");
 
+    public static final Option<Integer> DB_NUM =
+            Options.key("db_num")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Redis  database index id, it is connected to db 0 
by default");
+
     public static final Option<String> USER =
             Options.key("user")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 8954b4da2a..b1922263cf 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -40,6 +40,7 @@ public class RedisParameters implements Serializable {
     private String host;
     private int port;
     private String auth = "";
+    private int dbNum;
     private String user = "";
     private String keysPattern;
     private String keyField;
@@ -58,6 +59,10 @@ public class RedisParameters implements Serializable {
         if (config.hasPath(RedisConfig.AUTH.key())) {
             this.auth = config.getString(RedisConfig.AUTH.key());
         }
+        // set db_num
+        if (config.hasPath(RedisConfig.DB_NUM.key())) {
+            this.dbNum = config.getInt(RedisConfig.DB_NUM.key());
+        }
         // set user
         if (config.hasPath(RedisConfig.USER.key())) {
             this.user = config.getString(RedisConfig.USER.key());
@@ -115,6 +120,7 @@ public class RedisParameters implements Serializable {
                 if (StringUtils.isNotBlank(user)) {
                     jedis.aclSetUser(user);
                 }
+                jedis.select(dbNum);
                 return jedis;
             case CLUSTER:
                 HashSet<HostAndPort> nodes = new HashSet<>();
@@ -148,7 +154,9 @@ public class RedisParameters implements Serializable {
                 } else {
                     jedisCluster = new JedisCluster(nodes);
                 }
-                return new JedisWrapper(jedisCluster);
+                JedisWrapper jedisWrapper = new JedisWrapper(jedisCluster);
+                jedisWrapper.select(dbNum);
+                return jedisWrapper;
             default:
                 // do nothing
                 throw new RedisConnectorException(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index bd4a9063ba..2a2feb7744 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -98,6 +98,13 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         for (int i = 0; i < rows.size(); i++) {
             jedis.set("key_test" + i, new 
String(jsonSerializationSchema.serialize(rows.get(i))));
         }
+        // db_1 init data
+        jedis.select(1);
+        for (int i = 0; i < rows.size(); i++) {
+            jedis.set("key_test" + i, new 
String(jsonSerializationSchema.serialize(rows.get(i))));
+        }
+        // db_num backup
+        jedis.select(0);
     }
 
     private static Pair<SeaTunnelRowType, List<SeaTunnelRow>> 
generateTestDataSet() {
@@ -203,4 +210,14 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         Thread.sleep(60 * 1000);
         Assertions.assertEquals(0, jedis.llen("key_list"));
     }
+
+    @TestTemplate
+    public void restRedisDbNum(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/redis-to-redis-by-db-num.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        jedis.select(2);
+        Assertions.assertEquals(100, jedis.llen("db_test"));
+        jedis.del("db_test");
+        jedis.select(0);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
new file mode 100644
index 0000000000..a14bc2ab9f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = key
+    db_num=1
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "db_test"
+    data_type = list
+    db_num=2
+  }
+}
\ No newline at end of file

Reply via email to