This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new be37f05c07 [Improve][Redis] Redis reader use scan cammnd instead of
keys, single mode reader/writer support batch (#7087)
be37f05c07 is described below
commit be37f05c07eecccdb3805545080d1b61a6c10755
Author: FuYouJ <[email protected]>
AuthorDate: Wed Jul 3 22:17:10 2024 +0800
[Improve][Redis] Redis reader use scan cammnd instead of keys, single mode
reader/writer support batch (#7087)
---
docs/en/connector-v2/sink/Redis.md | 5 +
docs/en/connector-v2/source/Redis.md | 5 +
release-note.md | 1 +
.../seatunnel/redis/client/RedisClient.java | 77 ++++++++
.../seatunnel/redis/client/RedisClusterClient.java | 138 +++++++++++++
.../seatunnel/redis/client/RedisSingleClient.java | 216 +++++++++++++++++++++
.../seatunnel/redis/config/RedisConfig.java | 10 +-
.../seatunnel/redis/config/RedisDataType.java | 12 ++
.../seatunnel/redis/config/RedisParameters.java | 15 ++
.../seatunnel/redis/sink/RedisSinkWriter.java | 65 ++++++-
.../seatunnel/redis/source/RedisSourceReader.java | 162 ++++++++++++----
.../seatunnel/e2e/connector/redis/RedisIT.java | 113 +++++++++++
.../test/resources/redis-to-redis-by-db-num.conf | 2 +
.../src/test/resources/redis-to-redis-expire.conf | 1 +
.../src/test/resources/redis-to-redis.conf | 4 +-
...onf => scan-hash-to-redis-list-hash-check.conf} | 10 +-
...n-list-test-read-to-redis-list-test-check.conf} | 10 +-
....conf => scan-set-to-redis-list-set-check.conf} | 10 +-
...is-by-db-num.conf => scan-string-to-redis.conf} | 10 +-
...onf => scan-zset-to-redis-list-zset-check.conf} | 10 +-
20 files changed, 806 insertions(+), 70 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index f91e6bc6ec..ac4cd55cc4 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -18,6 +18,7 @@ Used to write data to Redis.
| port | int | yes | - |
| key | string | yes | - |
| data_type | string | yes | - |
+| batch_size | int | no | 10 |
| user | string | no | - |
| auth | string | no | - |
| db_num | int | no | 0 |
@@ -83,6 +84,10 @@ Redis data types, support `key` `hash` `list` `set` `zset`
- zset
> Each data from upstream will be added to the configured zset key with a
> weight of 1. So the order of data in zset is based on the order of data
> consumption.
+>
+ ### batch_size [int]
+
+ensure the batch write size in single-machine mode; no guarantees in cluster
mode.
### user [string]
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 3029f8061d..9af103f884 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -22,6 +22,7 @@ Used to read data from Redis.
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
+| batch_size | int | yes | 10 |
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
@@ -113,6 +114,10 @@ each kv that in hash key it will be treated as a row and
send it to upstream.
keys pattern
+### batch_size [int]
+
+indicates the number of keys to attempt to return per iteration,default 10
+
**Tips:Redis source connector support fuzzy key matching, user needs to ensure
that the matched keys are the same type**
### data_type [string]
diff --git a/release-note.md b/release-note.md
index 24455c40ac..0b5e884572 100644
--- a/release-note.md
+++ b/release-note.md
@@ -56,6 +56,7 @@
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that
mongodb saved it as numeric internally (#6997)
+- [Connector-v2] [Redis] Using scan replace keys operation command,support
batchWrite in single mode(#7030,#7085)
### Zeta(ST-Engine)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
new file mode 100644
index 0000000000..5730838cca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class RedisClient extends Jedis {
+
+ protected final RedisParameters redisParameters;
+
+ protected final int batchSize;
+
+ protected final Jedis jedis;
+
+ protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
+ this.redisParameters = redisParameters;
+ this.batchSize = redisParameters.getBatchSize();
+ this.jedis = jedis;
+ }
+
+ public ScanResult<String> scanKeys(
+ String cursor, int batchSize, String keysPattern, RedisDataType
type) {
+ ScanParams scanParams = new ScanParams();
+ scanParams.match(keysPattern);
+ scanParams.count(batchSize);
+ return jedis.scan(cursor, scanParams, type.name());
+ }
+
+ public abstract List<String> batchGetString(List<String> keys);
+
+ public abstract List<List<String>> batchGetList(List<String> keys);
+
+ public abstract List<Set<String>> batchGetSet(List<String> keys);
+
+ public abstract List<Map<String, String>> batchGetHash(List<String> keys);
+
+ public abstract List<List<String>> batchGetZset(List<String> keys);
+
+ public abstract void batchWriteString(
+ List<String> keys, List<String> values, long expireSeconds);
+
+ public abstract void batchWriteList(
+ List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+
+ public abstract void batchWriteSet(
+ List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+
+ public abstract void batchWriteHash(
+ List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+
+ public abstract void batchWriteZset(
+ List<String> keyBuffer, List<String> valueBuffer, long
expireSeconds);
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
new file mode 100644
index 0000000000..13acc89def
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RedisClusterClient extends RedisClient {
+ public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
+ super(redisParameters, jedis);
+ }
+
+ @Override
+ public List<String> batchGetString(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<String> result = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ result.add(jedis.get(key));
+ }
+ return result;
+ }
+
+ @Override
+ public List<List<String>> batchGetList(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<List<String>> result = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ result.add(jedis.lrange(key, 0, -1));
+ }
+ return result;
+ }
+
+ @Override
+ public List<Set<String>> batchGetSet(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<Set<String>> result = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ result.add(jedis.smembers(key));
+ }
+ return result;
+ }
+
+ @Override
+ public List<Map<String, String>> batchGetHash(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<Map<String, String>> result = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ Map<String, String> map = jedis.hgetAll(key);
+ map.put("hash_key", key);
+ result.add(map);
+ }
+ return result;
+ }
+
+ @Override
+ public List<List<String>> batchGetZset(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<List<String>> result = new ArrayList<>(keys.size());
+ for (String key : keys) {
+ result.add(jedis.zrange(key, 0, -1));
+ }
+ return result;
+ }
+
+ @Override
+ public void batchWriteString(List<String> keys, List<String> values, long
expireSeconds) {
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ RedisDataType.STRING.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+
+ @Override
+ public void batchWriteList(List<String> keys, List<String> values, long
expireSeconds) {
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ RedisDataType.LIST.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+
+ @Override
+ public void batchWriteSet(List<String> keys, List<String> values, long
expireSeconds) {
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ RedisDataType.SET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+
+ @Override
+ public void batchWriteHash(List<String> keys, List<String> values, long
expireSeconds) {
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ RedisDataType.HASH.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+
+ @Override
+ public void batchWriteZset(List<String> keys, List<String> values, long
expireSeconds) {
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ RedisDataType.ZSET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
new file mode 100644
index 0000000000..99bae5e733
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.client;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.Response;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// In standalone mode, pipeline can be used to improve batch read performance
+public class RedisSingleClient extends RedisClient {
+
+ public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) {
+ super(redisParameters, jedis);
+ }
+
+ @Override
+ public List<String> batchGetString(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ String[] keyArr = keys.toArray(new String[0]);
+ return jedis.mget(keyArr);
+ }
+
+ @Override
+ public List<List<String>> batchGetList(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ Pipeline pipeline = jedis.pipelined();
+ List<Response<List<String>>> responses = new ArrayList<>(keys.size());
+
+ for (String key : keys) {
+ responses.add(pipeline.lrange(key, 0, -1));
+ }
+
+ pipeline.sync();
+
+ List<List<String>> resultList = new ArrayList<>(keys.size());
+ for (Response<List<String>> response : responses) {
+ resultList.add(response.get());
+ }
+
+ return resultList;
+ }
+
+ @Override
+ public List<Set<String>> batchGetSet(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ Pipeline pipeline = jedis.pipelined();
+ List<Response<Set<String>>> responses = new ArrayList<>(keys.size());
+
+ for (String key : keys) {
+ responses.add(pipeline.smembers(key));
+ }
+
+ pipeline.sync();
+
+ List<Set<String>> resultList = new ArrayList<>(keys.size());
+ for (Response<Set<String>> response : responses) {
+ resultList.add(response.get());
+ }
+
+ return resultList;
+ }
+
+ @Override
+ public List<Map<String, String>> batchGetHash(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ Pipeline pipeline = jedis.pipelined();
+ List<Response<Map<String, String>>> responses = new
ArrayList<>(keys.size());
+
+ for (String key : keys) {
+ Response<Map<String, String>> response = pipeline.hgetAll(key);
+ responses.add(response);
+ }
+
+ pipeline.sync();
+
+ List<Map<String, String>> resultList = new ArrayList<>(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ Response<Map<String, String>> response = responses.get(i);
+ Map<String, String> map = response.get();
+ if (map != null) {
+ map.put("hash_key", keys.get(i));
+ }
+ resultList.add(map);
+ }
+
+ return resultList;
+ }
+
+ @Override
+ public List<List<String>> batchGetZset(List<String> keys) {
+ if (CollectionUtils.isEmpty(keys)) {
+ return new ArrayList<>();
+ }
+ List<Response<List<String>>> responses = new ArrayList<>(keys.size());
+ Pipeline pipelined = jedis.pipelined();
+ for (String key : keys) {
+ Response<List<String>> response = pipelined.zrange(key, 0, -1);
+ responses.add(response);
+ }
+ pipelined.sync();
+ List<List<String>> resultlist = new ArrayList<>(keys.size());
+ for (Response<List<String>> response : responses) {
+ resultlist.add(response.get());
+ }
+ return resultlist;
+ }
+
+ @Override
+ public void batchWriteString(List<String> keys, List<String> values, long
expireSeconds) {
+ Pipeline pipelined = jedis.pipelined();
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ String key = keys.get(i);
+ String value = values.get(i);
+ pipelined.set(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
+ }
+ pipelined.sync();
+ }
+
+ @Override
+ public void batchWriteList(List<String> keys, List<String> values, long
expireSeconds) {
+ Pipeline pipelined = jedis.pipelined();
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ String key = keys.get(i);
+ String value = values.get(i);
+ pipelined.lpush(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
+ }
+ pipelined.sync();
+ }
+
+ @Override
+ public void batchWriteSet(List<String> keys, List<String> values, long
expireSeconds) {
+ Pipeline pipelined = jedis.pipelined();
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ String key = keys.get(i);
+ String value = values.get(i);
+ pipelined.sadd(key, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
+ }
+ pipelined.sync();
+ }
+
+ @Override
+ public void batchWriteHash(List<String> keys, List<String> values, long
expireSeconds) {
+ Pipeline pipelined = jedis.pipelined();
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ String key = keys.get(i);
+ String value = values.get(i);
+ Map<String, String> fieldsMap = JsonUtils.toMap(value);
+ pipelined.hset(key, fieldsMap);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
+ }
+ pipelined.sync();
+ }
+
+ @Override
+ public void batchWriteZset(List<String> keys, List<String> values, long
expireSeconds) {
+ Pipeline pipelined = jedis.pipelined();
+ int size = keys.size();
+ for (int i = 0; i < size; i++) {
+ String key = keys.get(i);
+ String value = values.get(i);
+ pipelined.zadd(key, 1, value);
+ if (expireSeconds > 0) {
+ pipelined.expire(key, expireSeconds);
+ }
+ }
+ pipelined.sync();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
index c8a0a02dc7..3be5b39de9 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java
@@ -83,7 +83,7 @@ public class RedisConfig {
Options.key("data_type")
.enumType(RedisDataType.class)
.noDefaultValue()
- .withDescription("redis data types, support key hash list
set zset.");
+ .withDescription("redis data types, support string hash
list set zset.");
public static final Option<RedisConfig.Format> FORMAT =
Options.key("format")
@@ -119,6 +119,14 @@ public class RedisConfig {
.defaultValue(-1L)
.withDescription("Set redis expiration time.");
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "batch_size is used to control the size of a batch
of data during read and write operations"
+ + ",default 10");
+
public enum Format {
JSON,
// TEXT will be supported later
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
index a315e0cdae..aac874254d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java
@@ -40,6 +40,18 @@ public enum RedisDataType {
return Collections.singletonList(jedis.get(key));
}
},
+ STRING {
+ @Override
+ public void set(Jedis jedis, String key, String value, long expire) {
+ jedis.set(key, value);
+ expire(jedis, key, expire);
+ }
+
+ @Override
+ public List<String> get(Jedis jedis, String key) {
+ return Collections.singletonList(jedis.get(key));
+ }
+ },
HASH {
@Override
public void set(Jedis jedis, String key, String value, long expire) {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index d32f3bbb0d..ef1fe104d7 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -19,6 +19,9 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient;
+import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.commons.lang3.StringUtils;
@@ -48,6 +51,7 @@ public class RedisParameters implements Serializable {
private RedisConfig.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();
+ private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
public void buildWithConfig(ReadonlyConfig config) {
// set host
@@ -84,6 +88,17 @@ public class RedisParameters implements Serializable {
}
// set redis data type verification factory createAndPrepareSource
this.redisDataType = config.get(RedisConfig.DATA_TYPE);
+ // Indicates the number of keys to attempt to return per
iteration.default 10
+ this.batchSize = config.get(RedisConfig.BATCH_SIZE);
+ }
+
+ public RedisClient buildRedisClient() {
+ Jedis jedis = this.buildJedis();
+ if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
+ return new RedisSingleClient(this, jedis);
+ } else {
+ return new RedisClusterClient(this, jedis);
+ }
}
public Jedis buildJedis() {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 23eda57202..f03c5c48c8 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -21,24 +21,30 @@ import
org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import redis.clients.jedis.Jedis;
-
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
private final SeaTunnelRowType seaTunnelRowType;
private final RedisParameters redisParameters;
private final SerializationSchema serializationSchema;
- private final Jedis jedis;
+ private final RedisClient redisClient;
+
+ private final int batchSize;
+
+ private final List<String> keyBuffer;
+ private final List<String> valueBuffer;
public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters
redisParameters) {
this.seaTunnelRowType = seaTunnelRowType;
@@ -46,13 +52,15 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
// TODO according to format to initialize serializationSchema
// Now temporary using json serializationSchema
this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
- this.jedis = redisParameters.buildJedis();
+ this.redisClient = redisParameters.buildRedisClient();
+ this.batchSize = redisParameters.getBatchSize();
+ this.keyBuffer = new ArrayList<>(batchSize);
+ this.valueBuffer = new ArrayList<>(batchSize);
}
@Override
public void write(SeaTunnelRow element) throws IOException {
String data = new String(serializationSchema.serialize(element));
- RedisDataType redisDataType = redisParameters.getRedisDataType();
String keyField = redisParameters.getKeyField();
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key;
@@ -61,14 +69,51 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
} else {
key = keyField;
}
- long expire = redisParameters.getExpire();
- redisDataType.set(jedis, key, data, expire);
+ keyBuffer.add(key);
+ valueBuffer.add(data);
+ if (keyBuffer.size() >= batchSize) {
+ doBatchWrite();
+ clearBuffer();
+ }
+ }
+
+ private void clearBuffer() {
+ keyBuffer.clear();
+ valueBuffer.clear();
+ }
+
+ private void doBatchWrite() {
+ RedisDataType redisDataType = redisParameters.getRedisDataType();
+ if (RedisDataType.KEY.equals(redisDataType) ||
RedisDataType.STRING.equals(redisDataType)) {
+ redisClient.batchWriteString(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ return;
+ }
+ if (RedisDataType.LIST.equals(redisDataType)) {
+ redisClient.batchWriteList(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ return;
+ }
+ if (RedisDataType.SET.equals(redisDataType)) {
+ redisClient.batchWriteSet(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ return;
+ }
+ if (RedisDataType.HASH.equals(redisDataType)) {
+ redisClient.batchWriteHash(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ return;
+ }
+ if (RedisDataType.ZSET.equals(redisDataType)) {
+ redisClient.batchWriteZset(keyBuffer, valueBuffer,
redisParameters.getExpire());
+ return;
+ }
+ throw new RedisConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "UnSupport redisDataType,only support
string,list,hash,set,zset");
}
@Override
public void close() throws IOException {
- if (Objects.nonNull(jedis)) {
- jedis.close();
+ if (!keyBuffer.isEmpty()) {
+ doBatchWrite();
+ clearBuffer();
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
index 7dcea693fa..10825be8bc 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java
@@ -19,17 +19,21 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.source;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
-import redis.clients.jedis.Jedis;
+import org.apache.commons.collections4.CollectionUtils;
+
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
import java.io.IOException;
import java.util.List;
@@ -41,7 +45,7 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
private final RedisParameters redisParameters;
private final SingleSplitReaderContext context;
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
- private Jedis jedis;
+ private RedisClient redisClient;
public RedisSourceReader(
RedisParameters redisParameters,
@@ -54,46 +58,138 @@ public class RedisSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
@Override
public void open() throws Exception {
- this.jedis = redisParameters.buildJedis();
+ this.redisClient = redisParameters.buildRedisClient();
}
@Override
public void close() throws IOException {
- if (Objects.nonNull(jedis)) {
- jedis.close();
+ if (Objects.nonNull(redisClient)) {
+ redisClient.close();
}
}
@Override
public void internalPollNext(Collector<SeaTunnelRow> output) throws
Exception {
- Set<String> keys = jedis.keys(redisParameters.getKeysPattern());
- RedisDataType redisDataType = redisParameters.getRedisDataType();
- for (String key : keys) {
- List<String> values = redisDataType.get(jedis, key);
- for (String value : values) {
- if (deserializationSchema == null) {
- output.collect(new SeaTunnelRow(new Object[] {value}));
- } else {
- if (redisParameters.getHashKeyParseMode() ==
RedisConfig.HashKeyParseMode.KV
- && redisDataType == RedisDataType.HASH) {
- // Treat each key-value pair in the hash-key as one
piece of data
- Map<String, String> recordsMap =
JsonUtils.toMap(value);
- for (Map.Entry<String, String> entry :
recordsMap.entrySet()) {
- String k = entry.getKey();
- String v = entry.getValue();
- Map<String, String> valuesMap = JsonUtils.toMap(v);
- SeaTunnelDataType<SeaTunnelRow> seaTunnelRowType =
- deserializationSchema.getProducedType();
- valuesMap.put(((SeaTunnelRowType)
seaTunnelRowType).getFieldName(0), k);
- deserializationSchema.deserialize(
-
JsonUtils.toJsonString(valuesMap).getBytes(), output);
- }
- } else {
- deserializationSchema.deserialize(value.getBytes(),
output);
- }
- }
+ RedisDataType redisDataType =
resolveScanType(redisParameters.getRedisDataType());
+ String cursor = ScanParams.SCAN_POINTER_START;
+ String keysPattern = redisParameters.getKeysPattern();
+ int batchSize = redisParameters.getBatchSize();
+ while (true) {
+ // String cursor, int batchSize, String keysPattern, RedisType type
+ ScanResult<String> scanResult =
+ redisClient.scanKeys(cursor, batchSize, keysPattern,
redisDataType);
+ cursor = scanResult.getCursor();
+ List<String> keys = scanResult.getResult();
+ if (CollectionUtils.isEmpty(keys)) {
+ break;
+ }
+ pollNext(keys, redisDataType, output);
+ // when cursor return "0", scan end
+ if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
+ break;
}
}
context.signalNoMoreElement();
}
+
+ private void pollNext(List<String> keys, RedisDataType dataType,
Collector<SeaTunnelRow> output)
+ throws IOException {
+ if (RedisDataType.HASH.equals(dataType)) {
+ pollHashMapToNext(keys, output);
+ return;
+ }
+ if (RedisDataType.STRING.equals(dataType) ||
RedisDataType.KEY.equals(dataType)) {
+ pollStringToNext(keys, output);
+ return;
+ }
+ if (RedisDataType.LIST.equals(dataType)) {
+ pollListToNext(keys, output);
+ return;
+ }
+ if (RedisDataType.SET.equals(dataType)) {
+ pollSetToNext(keys, output);
+ return;
+ }
+ if (RedisDataType.ZSET.equals(dataType)) {
+ pollZsetToNext(keys, output);
+ return;
+ }
+ throw new RedisConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "UnSupport redisDataType,only support
string,list,hash,set,zset");
+ }
+
+ private void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> zSetList = redisClient.batchGetZset(keys);
+ for (List<String> values : zSetList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ private void pollSetToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<Set<String>> setList = redisClient.batchGetSet(keys);
+ for (Set<String> values : setList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ private void pollListToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<List<String>> valueList = redisClient.batchGetList(keys);
+ for (List<String> values : valueList) {
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+ }
+
+ private void pollStringToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<String> values = redisClient.batchGetString(keys);
+ for (String value : values) {
+ pollValueToNext(value, output);
+ }
+ }
+
+ private void pollValueToNext(String value, Collector<SeaTunnelRow> output)
throws IOException {
+ if (deserializationSchema == null) {
+ output.collect(new SeaTunnelRow(new Object[] {value}));
+ } else {
+ deserializationSchema.deserialize(value.getBytes(), output);
+ }
+ }
+
+ private void pollHashMapToNext(List<String> keys, Collector<SeaTunnelRow>
output)
+ throws IOException {
+ List<Map<String, String>> values = redisClient.batchGetHash(keys);
+ if (deserializationSchema == null) {
+ for (Map<String, String> value : values) {
+ output.collect(new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(value)}));
+ }
+ return;
+ }
+ for (Map<String, String> recordsMap : values) {
+ if (redisParameters.getHashKeyParseMode() ==
RedisConfig.HashKeyParseMode.KV) {
+ deserializationSchema.deserialize(
+ JsonUtils.toJsonString(recordsMap).getBytes(), output);
+ } else {
+ SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(new Object[]
{JsonUtils.toJsonString(recordsMap)});
+ output.collect(seaTunnelRow);
+ }
+ }
+ }
+
+ private RedisDataType resolveScanType(RedisDataType dataType) {
+ if (RedisDataType.KEY.equals(dataType)) {
+ return RedisDataType.STRING;
+ }
+ return dataType;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 7b03818c0b..0b9ed1bedb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -56,7 +56,9 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
@@ -223,6 +225,117 @@ public class RedisIT extends TestSuiteBase implements
TestResource {
jedis.select(0);
}
+ @TestTemplate
+ public void testScanStringTypeWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ String keyPrefix = "string_test";
+ for (int i = 0; i < 1000; i++) {
+ jedis.set(keyPrefix + i, "val");
+ }
+ Container.ExecResult execResult =
container.executeJob("/scan-string-to-redis.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("string_test_list", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ jedis.del("string_test_list");
+ for (int i = 0; i < 1000; i++) {
+ jedis.del(keyPrefix + i);
+ }
+ }
+
+ @TestTemplate
+ public void testScanListTypeWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ String keyPrefix = "list-test-read";
+ for (int i = 0; i < 100; i++) {
+ String list = keyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.lpush(list, "val" + j);
+ }
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/scan-list-test-read-to-redis-list-test-check.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("list-test-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ jedis.del("list-test-check");
+ for (int i = 0; i < 100; i++) {
+ String delKey = keyPrefix + i;
+ jedis.del(delKey);
+ }
+ }
+
+ @TestTemplate
+ public void testScanSetTypeWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ String setKeyPrefix = "key-test-set";
+ for (int i = 0; i < 100; i++) {
+ String setKey = setKeyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.sadd(setKey, j + "");
+ }
+ }
+ Container.ExecResult execResult =
+ container.executeJob("/scan-set-to-redis-list-set-check.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("list-set-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ jedis.del("list-set-check");
+ for (int i = 0; i < 100; i++) {
+ String setKey = setKeyPrefix + i;
+ jedis.del(setKey);
+ }
+ }
+
+ @TestTemplate
+ public void testScanHashTypeWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ String hashKeyPrefix = "key-test-hash";
+ for (int i = 0; i < 100; i++) {
+ String setKey = hashKeyPrefix + i;
+ Map<String, String> map = new HashMap<>();
+ map.put("name", "fuyoujie");
+ jedis.hset(setKey, map);
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/scan-hash-to-redis-list-hash-check.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("list-hash-check", 0, -1);
+ Assertions.assertEquals(100, list.size());
+ jedis.del("list-hash-check");
+ for (int i = 0; i < 100; i++) {
+ String hashKey = hashKeyPrefix + i;
+ jedis.del(hashKey);
+ }
+ for (int i = 0; i < 100; i++) {
+ String hashKey = hashKeyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.del(hashKey);
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testScanZsetTypeWriteRedis(TestContainer container)
+ throws IOException, InterruptedException {
+ String zSetKeyPrefix = "key-test-zset";
+ for (int i = 0; i < 100; i++) {
+ String key = zSetKeyPrefix + i;
+ for (int j = 0; j < 10; j++) {
+ jedis.zadd(key, 1, j + "");
+ }
+ }
+ Container.ExecResult execResult =
+
container.executeJob("/scan-zset-to-redis-list-zset-check.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ List<String> list = jedis.lrange("list-zset-check", 0, -1);
+ Assertions.assertEquals(1000, list.size());
+ jedis.del("list-zset-check");
+ for (int i = 0; i < 100; i++) {
+ String key = zSetKeyPrefix + i;
+ jedis.del(key);
+ }
+ }
+
@TestTemplate
@DisabledOnContainer(
value = {},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
index 5b46c14171..9c1dec0c36 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
@@ -36,6 +36,7 @@ source {
keys = "key_test*"
data_type = key
db_num=1
+ batch_size = 33
}
}
@@ -47,5 +48,6 @@ sink {
key = "db_test"
data_type = list
db_num=2
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
index cd367ad872..133773d799 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf
@@ -35,6 +35,7 @@ source {
auth = "U2VhVHVubmVs"
keys = "key_test*"
data_type = key
+ batch_size = 33
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
index 17c5de5489..20fee261f0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
@@ -34,7 +34,8 @@ source {
port = 6379
auth = "U2VhVHVubmVs"
keys = "key_test*"
- data_type = key
+ data_type = string
+ batch_size = 33
}
}
@@ -45,5 +46,6 @@ sink {
auth = "U2VhVHVubmVs"
key = "key_list"
data_type = list
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
index 5b46c14171..ee76a0a839 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf
@@ -33,9 +33,9 @@ source {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- keys = "key_test*"
- data_type = key
- db_num=1
+ keys = "key-test-hash*"
+ data_type = hash
+ batch_size = 33
}
}
@@ -44,8 +44,8 @@ sink {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- key = "db_test"
+ key = "list-hash-check"
data_type = list
- db_num=2
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
index 5b46c14171..62db2eaa16 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf
@@ -33,9 +33,9 @@ source {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- keys = "key_test*"
- data_type = key
- db_num=1
+ keys = "list-test-read*"
+ data_type = list
+ batch_size = 33
}
}
@@ -44,8 +44,8 @@ sink {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- key = "db_test"
+ key = "list-test-check"
data_type = list
- db_num=2
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
index 5b46c14171..4c64224863 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf
@@ -33,9 +33,9 @@ source {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- keys = "key_test*"
- data_type = key
- db_num=1
+ keys = "key-test-set*"
+ data_type = set
+ batch_size = 33
}
}
@@ -44,8 +44,8 @@ sink {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- key = "db_test"
+ key = "list-set-check"
data_type = list
- db_num=2
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
index 5b46c14171..317e603978 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf
@@ -33,9 +33,9 @@ source {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- keys = "key_test*"
- data_type = key
- db_num=1
+ keys = "string_test*"
+ data_type = string
+ batch_size = 33
}
}
@@ -44,8 +44,8 @@ sink {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- key = "db_test"
+ key = "string_test_list"
data_type = list
- db_num=2
+ batch_size = 33
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
similarity index 91%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
index 5b46c14171..f7d2c05d37 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf
@@ -33,9 +33,9 @@ source {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- keys = "key_test*"
- data_type = key
- db_num=1
+ keys = "key-test-zset*"
+ data_type = zset
+ batch_size = 33
}
}
@@ -44,8 +44,8 @@ sink {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
- key = "db_test"
+ key = "list-zset-check"
data_type = list
- db_num=2
+ batch_size = 33
}
}
\ No newline at end of file