This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b5321ff1d2 [Feature][Connector-v2][RedisSink]Support redis to set
expiration time. (#4975)
b5321ff1d2 is described below
commit b5321ff1d2aa60309d46286158c158ff67359ae3
Author: lightzhao <[email protected]>
AuthorDate: Mon Aug 14 10:11:34 2023 +0800
[Feature][Connector-v2][RedisSink]Support redis to set expiration time.
(#4975)
* Support redis to set expiration time.
* Set redis expire default value.
* add e2e test.
* add e2e test.
* modify config file name.
---------
Co-authored-by: lightzhao <[email protected]>
---
docs/en/connector-v2/sink/Redis.md | 5 +++
.../seatunnel/redis/config/RedisConfig.java | 6 +++
.../seatunnel/redis/config/RedisDataType.java | 23 +++++++---
.../seatunnel/redis/config/RedisParameters.java | 4 ++
.../seatunnel/redis/sink/RedisSinkFactory.java | 3 +-
.../seatunnel/redis/sink/RedisSinkWriter.java | 3 +-
.../seatunnel/e2e/connector/redis/RedisIT.java | 11 +++++
.../src/test/resources/redis-to-redis-expire.conf | 50 ++++++++++++++++++++++
8 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index fcface7da2..7d2ef237e1 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -23,6 +23,7 @@ Used to write data to Redis.
| mode | string | no | single |
| nodes | list | yes when mode=cluster | - |
| format | string | no | json |
+| expire | long | no | -1 |
| common-options | | no | - |
### host [string]
@@ -120,6 +121,10 @@ Connector will generate data as the following and write it
to redis:
```
+### expire [long]
+
+Set redis expiration time, the unit is second. The default value is -1, keys
do not automatically expire by default.
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index c777d23782..511cbe4aa9 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -102,6 +102,12 @@ public class RedisConfig {
.withDescription(
"hash key parse mode, support all or kv, default
value is all");
+ public static final Option<Long> EXPIRE =
+ Options.key("expire")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription("Set redis expiration time.");
+
public enum Format {
JSON,
// TEXT will be supported later
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index 64772b5381..a315e0cdae 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -30,8 +30,9 @@ import java.util.Set;
public enum RedisDataType {
KEY {
@Override
- public void set(Jedis jedis, String key, String value) {
+ public void set(Jedis jedis, String key, String value, long expire) {
jedis.set(key, value);
+ expire(jedis, key, expire);
}
@Override
@@ -41,9 +42,10 @@ public enum RedisDataType {
},
HASH {
@Override
- public void set(Jedis jedis, String key, String value) {
+ public void set(Jedis jedis, String key, String value, long expire) {
Map<String, String> fieldsMap = JsonUtils.toMap(value);
jedis.hset(key, fieldsMap);
+ expire(jedis, key, expire);
}
@Override
@@ -54,8 +56,9 @@ public enum RedisDataType {
},
LIST {
@Override
- public void set(Jedis jedis, String key, String value) {
+ public void set(Jedis jedis, String key, String value, long expire) {
jedis.lpush(key, value);
+ expire(jedis, key, expire);
}
@Override
@@ -65,8 +68,9 @@ public enum RedisDataType {
},
SET {
@Override
- public void set(Jedis jedis, String key, String value) {
+ public void set(Jedis jedis, String key, String value, long expire) {
jedis.sadd(key, value);
+ expire(jedis, key, expire);
}
@Override
@@ -77,8 +81,9 @@ public enum RedisDataType {
},
ZSET {
@Override
- public void set(Jedis jedis, String key, String value) {
+ public void set(Jedis jedis, String key, String value, long expire) {
jedis.zadd(key, 1, value);
+ expire(jedis, key, expire);
}
@Override
@@ -91,7 +96,13 @@ public enum RedisDataType {
return Collections.emptyList();
}
- public void set(Jedis jedis, String key, String value) {
+ private static void expire(Jedis jedis, String key, long expire) {
+ if (expire > 0) {
+ jedis.expire(key, expire);
+ }
+ }
+
+ public void set(Jedis jedis, String key, String value, long expire) {
// do nothing
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index c8bb879d0f..8954b4da2a 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -47,6 +47,7 @@ public class RedisParameters implements Serializable {
private RedisConfig.RedisMode mode;
private RedisConfig.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
+ private long expire = RedisConfig.EXPIRE.defaultValue();
public void buildWithConfig(Config config) {
// set host
@@ -89,6 +90,9 @@ public class RedisParameters implements Serializable {
if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
}
+ if (config.hasPath(RedisConfig.EXPIRE.key())) {
+ this.expire = config.getLong(RedisConfig.EXPIRE.key());
+ }
// set redis data type
try {
String dataType = config.getString(RedisConfig.DATA_TYPE.key());
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
index e68a893f79..22ae156874 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java
@@ -41,7 +41,8 @@ public class RedisSinkFactory implements TableSinkFactory {
RedisConfig.AUTH,
RedisConfig.USER,
RedisConfig.KEY_PATTERN,
- RedisConfig.FORMAT)
+ RedisConfig.FORMAT,
+ RedisConfig.EXPIRE)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER,
RedisConfig.NODES)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 657e3aaa56..80b1449b9d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -59,7 +59,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
} else {
key = keyField;
}
- redisDataType.set(jedis, key, data);
+ long expire = redisParameters.getExpire();
+ redisDataType.set(jedis, key, data, expire);
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 808f686033..bd4a9063ba 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -192,4 +192,15 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
jedis.del("key_list");
Assertions.assertEquals(0, jedis.llen("key_list"));
}
+
+ @TestTemplate
+ public void testRedisWithExpire(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/redis-to-redis-expire.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(100, jedis.llen("key_list"));
+ // Clear data to prevent data duplication in the next TestContainer
+ Thread.sleep(60 * 1000);
+ Assertions.assertEquals(0, jedis.llen("key_list"));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
new file mode 100644
index 0000000000..4a42bd3a46
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ keys = "key_test*"
+ data_type = key
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key_list"
+ data_type = list
+ expire = 30
+ }
+}
\ No newline at end of file