This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5321ff1d2 [Feature][Connector-v2][RedisSink]Support redis to set 
expiration time. (#4975)
b5321ff1d2 is described below

commit b5321ff1d2aa60309d46286158c158ff67359ae3
Author: lightzhao <[email protected]>
AuthorDate: Mon Aug 14 10:11:34 2023 +0800

    [Feature][Connector-v2][RedisSink]Support redis to set expiration time. 
(#4975)
    
    * Support redis to set expiration time.
    
    * Set redis expire default value.
    
    * add e2e test.
    
    * add e2e test.
    
    * modify config file name.
    
    ---------
    
    Co-authored-by: lightzhao <[email protected]>
---
 docs/en/connector-v2/sink/Redis.md                 |  5 +++
 .../seatunnel/redis/config/RedisConfig.java        |  6 +++
 .../seatunnel/redis/config/RedisDataType.java      | 23 +++++++---
 .../seatunnel/redis/config/RedisParameters.java    |  4 ++
 .../seatunnel/redis/sink/RedisSinkFactory.java     |  3 +-
 .../seatunnel/redis/sink/RedisSinkWriter.java      |  3 +-
 .../seatunnel/e2e/connector/redis/RedisIT.java     | 11 +++++
 .../src/test/resources/redis-to-redis-expire.conf  | 50 ++++++++++++++++++++++
 8 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index fcface7da2..7d2ef237e1 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -23,6 +23,7 @@ Used to write data to Redis.
 | mode           | string | no                    | single        |
 | nodes          | list   | yes when mode=cluster | -             |
 | format         | string | no                    | json          |
+| expire         | long   | no                    | -1            |
 | common-options |        | no                    | -             |
 
 ### host [string]
@@ -120,6 +121,10 @@ Connector will generate data as the following and write it 
to redis:
 
 ```
 
+### expire [long]
+
+Set redis expiration time, the unit is second. The default value is -1, keys 
do not automatically expire by default.
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index c777d23782..511cbe4aa9 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -102,6 +102,12 @@ public class RedisConfig {
                     .withDescription(
                             "hash key parse mode, support all or kv, default 
value is all");
 
+    public static final Option<Long> EXPIRE =
+            Options.key("expire")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription("Set redis expiration time.");
+
     public enum Format {
         JSON,
         // TEXT will be supported later
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index 64772b5381..a315e0cdae 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -30,8 +30,9 @@ import java.util.Set;
 public enum RedisDataType {
     KEY {
         @Override
-        public void set(Jedis jedis, String key, String value) {
+        public void set(Jedis jedis, String key, String value, long expire) {
             jedis.set(key, value);
+            expire(jedis, key, expire);
         }
 
         @Override
@@ -41,9 +42,10 @@ public enum RedisDataType {
     },
     HASH {
         @Override
-        public void set(Jedis jedis, String key, String value) {
+        public void set(Jedis jedis, String key, String value, long expire) {
             Map<String, String> fieldsMap = JsonUtils.toMap(value);
             jedis.hset(key, fieldsMap);
+            expire(jedis, key, expire);
         }
 
         @Override
@@ -54,8 +56,9 @@ public enum RedisDataType {
     },
     LIST {
         @Override
-        public void set(Jedis jedis, String key, String value) {
+        public void set(Jedis jedis, String key, String value, long expire) {
             jedis.lpush(key, value);
+            expire(jedis, key, expire);
         }
 
         @Override
@@ -65,8 +68,9 @@ public enum RedisDataType {
     },
     SET {
         @Override
-        public void set(Jedis jedis, String key, String value) {
+        public void set(Jedis jedis, String key, String value, long expire) {
             jedis.sadd(key, value);
+            expire(jedis, key, expire);
         }
 
         @Override
@@ -77,8 +81,9 @@ public enum RedisDataType {
     },
     ZSET {
         @Override
-        public void set(Jedis jedis, String key, String value) {
+        public void set(Jedis jedis, String key, String value, long expire) {
             jedis.zadd(key, 1, value);
+            expire(jedis, key, expire);
         }
 
         @Override
@@ -91,7 +96,13 @@ public enum RedisDataType {
         return Collections.emptyList();
     }
 
-    public void set(Jedis jedis, String key, String value) {
+    private static void expire(Jedis jedis, String key, long expire) {
+        if (expire > 0) {
+            jedis.expire(key, expire);
+        }
+    }
+
+    public void set(Jedis jedis, String key, String value, long expire) {
         // do nothing
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index c8bb879d0f..8954b4da2a 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -47,6 +47,7 @@ public class RedisParameters implements Serializable {
     private RedisConfig.RedisMode mode;
     private RedisConfig.HashKeyParseMode hashKeyParseMode;
     private List<String> redisNodes = Collections.emptyList();
+    private long expire = RedisConfig.EXPIRE.defaultValue();
 
     public void buildWithConfig(Config config) {
         // set host
@@ -89,6 +90,9 @@ public class RedisParameters implements Serializable {
         if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
             this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
         }
+        if (config.hasPath(RedisConfig.EXPIRE.key())) {
+            this.expire = config.getLong(RedisConfig.EXPIRE.key());
+        }
         // set redis data type
         try {
             String dataType = config.getString(RedisConfig.DATA_TYPE.key());
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index e68a893f79..22ae156874 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -41,7 +41,8 @@ public class RedisSinkFactory implements TableSinkFactory {
                         RedisConfig.AUTH,
                         RedisConfig.USER,
                         RedisConfig.KEY_PATTERN,
-                        RedisConfig.FORMAT)
+                        RedisConfig.FORMAT,
+                        RedisConfig.EXPIRE)
                 .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, 
RedisConfig.NODES)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 657e3aaa56..80b1449b9d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -59,7 +59,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         } else {
             key = keyField;
         }
-        redisDataType.set(jedis, key, data);
+        long expire = redisParameters.getExpire();
+        redisDataType.set(jedis, key, data, expire);
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 808f686033..bd4a9063ba 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -192,4 +192,15 @@ public class RedisIT extends TestSuiteBase implements 
TestResource {
         jedis.del("key_list");
         Assertions.assertEquals(0, jedis.llen("key_list"));
     }
+
+    @TestTemplate
+    public void testRedisWithExpire(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
container.executeJob("/redis-to-redis-expire.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, jedis.llen("key_list"));
+        // Clear data to prevent data duplication in the next TestContainer
+        Thread.sleep(60 * 1000);
+        Assertions.assertEquals(0, jedis.llen("key_list"));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
new file mode 100644
index 0000000000..4a42bd3a46
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  shade.identifier = "base64"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    keys = "key_test*"
+    data_type = key
+  }
+}
+
+sink {
+  Redis {
+    host = "redis-e2e"
+    port = 6379
+    auth = "U2VhVHVubmVs"
+    key = "key_list"
+    data_type = list
+    expire = 30
+  }
+}
\ No newline at end of file

Reply via email to