This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 07510ed937 [Fix][Connector-Redis] Redis did not write successfully,
but the task did not fail (#9055)
07510ed937 is described below
commit 07510ed937a0dfa54f479bd1ab25c5f8f5cc6763
Author: limin <[email protected]>
AuthorDate: Thu Mar 27 15:04:41 2025 +0800
[Fix][Connector-Redis] Redis did not write successfully, but the task did
not fail (#9055)
Co-authored-by: limin <[email protected]>
---
.../seatunnel/redis/client/RedisSingleClient.java | 54 ++++--
.../seatunnel/redis/exception/RedisErrorCode.java | 3 +-
.../e2e/connector/redis/RedisMasterAndSlaveIT.java | 181 +++++++++++++++++++++
.../fake-to-redis-test-readonly-hash.conf | 67 ++++++++
.../resources/fake-to-redis-test-readonly-key.conf | 66 ++++++++
.../fake-to-redis-test-readonly-list.conf | 66 ++++++++
.../resources/fake-to-redis-test-readonly-set.conf | 66 ++++++++
.../fake-to-redis-test-readonly-zset.conf | 66 ++++++++
8 files changed, 553 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index c9d3ba6788..c0b6572c7c 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -20,12 +20,15 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.client;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
import org.apache.commons.collections4.CollectionUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
+import redis.clients.jedis.exceptions.JedisException;
import java.util.ArrayList;
import java.util.List;
@@ -142,6 +145,7 @@ public class RedisSingleClient extends RedisClient {
@Override
public void batchWriteString(
List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
+ List<Response<?>> responses = new ArrayList<>();
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
@@ -149,20 +153,22 @@ public class RedisSingleClient extends RedisClient {
String key = keys.get(i);
String value = values.get(i);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
- pipelined.del(key);
+ responses.add(pipelined.del(key));
} else {
- pipelined.set(key, value);
+ responses.add(pipelined.set(key, value));
if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ responses.add(pipelined.expire(key, expireSeconds));
}
}
}
pipelined.sync();
+ processResponses(responses);
}
@Override
public void batchWriteList(
List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
+ List<Response<?>> responses = new ArrayList<>();
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
@@ -170,20 +176,22 @@ public class RedisSingleClient extends RedisClient {
String key = keys.get(i);
String value = values.get(i);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
- pipelined.lrem(key, 1, value);
+ responses.add(pipelined.lrem(key, 1, value));
} else {
- pipelined.lpush(key, value);
+ responses.add(pipelined.lpush(key, value));
if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ responses.add(pipelined.expire(key, expireSeconds));
}
}
}
pipelined.sync();
+ processResponses(responses);
}
@Override
public void batchWriteSet(
List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
+ List<Response<?>> responses = new ArrayList<>();
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
@@ -191,20 +199,22 @@ public class RedisSingleClient extends RedisClient {
String key = keys.get(i);
String value = values.get(i);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
- pipelined.srem(key, value);
+ responses.add(pipelined.srem(key, value));
} else {
- pipelined.sadd(key, value);
+ responses.add(pipelined.sadd(key, value));
if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ responses.add(pipelined.expire(key, expireSeconds));
}
}
}
pipelined.sync();
+ processResponses(responses);
}
@Override
public void batchWriteHash(
List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
+ List<Response<?>> responses = new ArrayList<>();
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
@@ -214,21 +224,23 @@ public class RedisSingleClient extends RedisClient {
Map<String, String> fieldsMap = JsonUtils.toMap(value);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
- pipelined.hdel(key, entry.getKey());
+ responses.add(pipelined.hdel(key, entry.getKey()));
}
} else {
- pipelined.hset(key, fieldsMap);
+ responses.add(pipelined.hset(key, fieldsMap));
if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ responses.add(pipelined.expire(key, expireSeconds));
}
}
}
pipelined.sync();
+ processResponses(responses);
}
@Override
public void batchWriteZset(
List<RowKind> rowKinds, List<String> keys, List<String> values,
long expireSeconds) {
+ List<Response<?>> responses = new ArrayList<>();
Pipeline pipelined = jedis.pipelined();
int size = keys.size();
for (int i = 0; i < size; i++) {
@@ -236,14 +248,26 @@ public class RedisSingleClient extends RedisClient {
String key = keys.get(i);
String value = values.get(i);
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE)
{
- pipelined.zrem(key, value);
+ responses.add(pipelined.zrem(key, value));
} else {
- pipelined.zadd(key, 1, value);
+ responses.add(pipelined.zadd(key, 1, value));
if (expireSeconds > 0) {
- pipelined.expire(key, expireSeconds);
+ responses.add(pipelined.expire(key, expireSeconds));
}
}
}
pipelined.sync();
+ processResponses(responses);
+ }
+
+ private void processResponses(List<Response<?>> responseList) {
+ try {
+ for (Response<?> response : responseList) {
+ // If the response is an exception object, it will be thrown
+ response.get();
+ }
+ } catch (JedisException e) {
+ throw new
RedisConnectorException(RedisErrorCode.GET_RESPONSE_FAILED, e);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
index 4d9bb745bd..a82a1425de 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -20,7 +20,8 @@ import
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum RedisErrorCode implements SeaTunnelErrorCode {
GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the
redis version"),
- INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");
+ INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"),
+ GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write
response");
private final String code;
private final String description;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
new file mode 100644
index 0000000000..9c84bb3bf8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.connector.redis;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisMasterAndSlaveIT extends TestSuiteBase implements
TestResource {
+ private static RedisContainerInfo masterContainerInfo;
+ private static RedisContainerInfo slaveContainerInfo;
+ private static GenericContainer<?> master;
+ private static GenericContainer<?> slave;
+ private Jedis slaveJedis;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ masterContainerInfo =
+ new RedisContainerInfo("redis-e2e-master", 6379, "SeaTunnel",
"redis:7");
+ master =
+ new
GenericContainer<>(DockerImageName.parse(masterContainerInfo.getImageName()))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(masterContainerInfo.getHost())
+ .withExposedPorts(masterContainerInfo.getPort())
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+
masterContainerInfo.getImageName())))
+ .withCommand(
+ String.format(
+ "redis-server --requirepass %s",
+ masterContainerInfo.getPassword()))
+ .waitingFor(
+ new HostPortWaitStrategy()
+
.withStartupTimeout(Duration.ofMinutes(2)));
+ master.start();
+ log.info("Redis master container started");
+
+ slaveContainerInfo =
+ new RedisContainerInfo("redis-e2e-slave", 6379, "SeaTunnel",
"redis:7");
+ slave =
+ new
GenericContainer<>(DockerImageName.parse(slaveContainerInfo.getImageName()))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(slaveContainerInfo.getHost())
+ .withExposedPorts(slaveContainerInfo.getPort())
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+
slaveContainerInfo.getImageName())))
+ .withCommand(
+ String.format(
+ "redis-server --requirepass %s
--slaveof %s %s --masterauth %s",
+ slaveContainerInfo.getPassword(),
+ masterContainerInfo.getHost(),
+ masterContainerInfo.getPort(),
+ masterContainerInfo.getPassword()))
+ .waitingFor(
+ new HostPortWaitStrategy()
+
.withStartupTimeout(Duration.ofMinutes(2)));
+ slave.start();
+ log.info("Redis slave container started");
+ Startables.deepStart(Stream.of(master, slave)).join();
+ this.initSlaveJedis();
+ }
+
+ private void initSlaveJedis() {
+ Jedis jedis = new Jedis(slave.getHost(), slave.getFirstMappedPort());
+ jedis.auth(slaveContainerInfo.getPassword());
+ jedis.ping();
+ this.slaveJedis = jedis;
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (Objects.nonNull(slaveJedis)) {
+ slaveJedis.close();
+ }
+
+ if (Objects.nonNull(slave)) {
+ slave.close();
+ }
+ if (Objects.nonNull(master)) {
+ master.close();
+ }
+ }
+
+ @TestTemplate
+ public void testWriteKeyToReadOnlyRedis(TestContainer container) {
+ try {
+ container.executeJob("/fake-to-redis-test-readonly-key.conf");
+ } catch (Exception e) {
+ String containerLogs = container.getServerLogs();
+ Assertions.assertTrue(
+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+ }
+ Assertions.assertEquals(null, slaveJedis.get("key_check"));
+ }
+
+ @TestTemplate
+ public void testWriteListToReadOnlyRedis(TestContainer container) {
+ try {
+ container.executeJob("/fake-to-redis-test-readonly-list.conf");
+ } catch (Exception e) {
+ String containerLogs = container.getServerLogs();
+ Assertions.assertTrue(
+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+ }
+ Assertions.assertEquals(0, slaveJedis.llen("list_check"));
+ }
+
+ @TestTemplate
+ public void testWriteSetToReadOnlyRedis(TestContainer container) {
+ try {
+ container.executeJob("/fake-to-redis-test-readonly-set.conf");
+ } catch (Exception e) {
+ String containerLogs = container.getServerLogs();
+ Assertions.assertTrue(
+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+ }
+ Assertions.assertEquals(0, slaveJedis.scard("set_check"));
+ }
+
+ @TestTemplate
+ public void testWriteZSetToReadOnlyRedis(TestContainer container) {
+ try {
+ container.executeJob("/fake-to-redis-test-readonly-zset.conf");
+ } catch (Exception e) {
+ String containerLogs = container.getServerLogs();
+ Assertions.assertTrue(
+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+ }
+ Assertions.assertEquals(0, slaveJedis.zcard("zset_check"));
+ }
+
+ @TestTemplate
+ public void testWriteHashToReadOnlyRedis(TestContainer container) {
+ try {
+ container.executeJob("/fake-to-redis-test-readonly-hash.conf");
+ } catch (Exception e) {
+ String containerLogs = container.getServerLogs();
+ Assertions.assertTrue(
+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
+ }
+ Assertions.assertEquals(0, slaveJedis.hlen("hash_check"));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
new file mode 100644
index 0000000000..bd429f0908
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-hash.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e-slave"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "hash_check"
+ data_type = hash
+ hash_key_field = "id"
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
new file mode 100644
index 0000000000..47d41e0764
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-key.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e-slave"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "key_check"
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
new file mode 100644
index 0000000000..c3cceb88f4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-list.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e-slave"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "list_check"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
new file mode 100644
index 0000000000..174301e823
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-set.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e-slave"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "set_check"
+ data_type = set
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
new file mode 100644
index 0000000000..28a3c9d374
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-readonly-zset.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ shade.identifier = "base64"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
+
+sink {
+ Redis {
+ host = "redis-e2e-slave"
+ port = 6379
+ auth = "U2VhVHVubmVs"
+ key = "zset_check"
+ data_type = zset
+ batch_size = 33
+ }
+}
\ No newline at end of file