This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3930010386 [Improve][E2E] Improve redis e2e for delete (#10018)
3930010386 is described below
commit 39300103860e0ece4bc97cd2c6fef1092a461d2b
Author: 老王 <[email protected]>
AuthorDate: Tue Dec 2 17:46:58 2025 +0800
[Improve][E2E] Improve redis e2e for delete (#10018)
---
seatunnel-connectors-v2/connector-redis/pom.xml | 14 +
.../redis/config}/RedisContainerInfo.java | 5 +-
.../connectors/seatunnel/redis/Redis5Test.java | 12 +-
.../connectors/seatunnel/redis/Redis7Test.java | 12 +-
.../seatunnel/redis/RedisTemplateTest.java | 396 +++++++++++++++++++++
.../seatunnel/e2e/connector/redis/Redis5IT.java | 2 +
.../seatunnel/e2e/connector/redis/Redis7IT.java | 2 +
.../e2e/connector/redis/RedisClusterIT.java | 1 +
.../e2e/connector/redis/RedisMasterAndSlaveIT.java | 1 +
.../connector/redis/RedisTestCaseTemplateIT.java | 77 +---
.../resources/fake-to-redis-test-delete-key.conf | 87 -----
.../resources/fake-to-redis-test-delete-list.conf | 86 -----
.../resources/fake-to-redis-test-delete-set.conf | 86 -----
.../resources/fake-to-redis-test-delete-zset.conf | 86 -----
14 files changed, 444 insertions(+), 423 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml
b/seatunnel-connectors-v2/connector-redis/pom.xml
index 755d1fb509..eb260775c0 100644
--- a/seatunnel-connectors-v2/connector-redis/pom.xml
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -59,6 +59,20 @@
<version>${jedis.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
similarity index 89%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
rename to
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
index 61b55a6594..f06def7dce 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
@@ -14,8 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
public class RedisContainerInfo {
private final String host;
private final int port;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
similarity index 69%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
copy to
seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
index 6df6e80d6e..1601f20191 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
@@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis;
-public class Redis5IT extends RedisTestCaseTemplateIT {
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason = "There is no docker environment on the windows test
system")
+public class Redis5Test extends RedisTemplateTest {
@Override
public RedisContainerInfo getRedisContainerInfo() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
similarity index 69%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
copy to
seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
index dfa46e886a..a2f008e799 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
@@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis;
-public class Redis7IT extends RedisTestCaseTemplateIT {
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs(
+ value = OS.WINDOWS,
+ disabledReason = "There is no docker environment on the windows test
system")
+public class Redis7Test extends RedisTemplateTest {
@Override
public RedisContainerInfo getRedisContainerInfo() {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
new file mode 100644
index 0000000000..b7e03962c5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
+
+@Slf4j
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RedisTemplateTest {
+
+ protected String host;
+ protected int port;
+ protected String password;
+ protected String imageName;
+ protected Jedis jedis;
+ protected GenericContainer<?> redisContainer;
+
+ @BeforeAll
+ public void startUp() {
+ initContainerInfo();
+ Network NETWORK =
+ Network.builder()
+ .createNetworkCmdModifier(
+ cmd -> cmd.withName("SEATUNNEL-" +
UUID.randomUUID()))
+ .enableIpv6(false)
+ .build();
+
+ this.redisContainer =
+ new GenericContainer<>(DockerImageName.parse(imageName))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(host)
+ .withExposedPorts(port)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName)))
+ .withCommand(String.format("redis-server --requirepass
%s", password))
+ .waitingFor(
+ new HostPortWaitStrategy()
+
.withStartupTimeout(Duration.ofMinutes(2)));
+
+ Startables.deepStart(Stream.of(redisContainer)).join();
+ log.info("Redis container started");
+ this.initJedis();
+ this.initSourceData();
+ }
+
+ protected void initSourceData() {}
+
+ protected abstract RedisContainerInfo getRedisContainerInfo();
+
+ private void initJedis() {
+ Jedis jedis = new Jedis(redisContainer.getHost(),
redisContainer.getFirstMappedPort());
+ jedis.auth(password);
+ jedis.ping();
+ this.jedis = jedis;
+ }
+
+ protected void initContainerInfo() {
+ RedisContainerInfo redisContainerInfo = getRedisContainerInfo();
+ this.host = redisContainerInfo.getHost();
+ this.port = redisContainerInfo.getPort();
+ this.password = redisContainerInfo.getPassword();
+ this.imageName = redisContainerInfo.getImageName();
+ }
+
+ @AfterAll
+ public void tearDown() {
+ if (Objects.nonNull(jedis)) {
+ jedis.close();
+ }
+ redisContainer.close();
+ }
+
+ @Test
+ public void testFakeToRedisDeleteHashTest() throws IOException {
+ String key = "hash_check";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("hash_key_field", "id");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ Assertions.assertEquals(2, jedis.hlen(key));
+ jedis.del(key);
+ }
+
+ @Test
+ public void testFakeToRedisDeleteKeyTest() throws IOException {
+ String key = "key_check:{id}";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("support_custom_key", true);
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ int count = 0;
+ for (int i = 1; i <= 3; i++) {
+ String data = jedis.get("key_check:" + i);
+ if (data != null) {
+ count++;
+ }
+ }
+ Assertions.assertEquals(2, count);
+ for (int i = 1; i <= 3; i++) {
+ jedis.del("key_check:" + i);
+ }
+ }
+
+ @Test
+ public void testFakeToRedisDeleteListTest() throws IOException {
+ String key = "list_check";
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.LIST, key, new
HashMap<>()),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ Assertions.assertEquals(2, jedis.llen(key));
+ jedis.del(key);
+ }
+
+ @Test
+ public void testFakeToRedisDeleteSetTest() throws IOException {
+ String key = "set_check";
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.SET, key, new
HashMap<>()),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ Assertions.assertEquals(2, jedis.scard(key));
+ jedis.del(key);
+ }
+
+ @Test
+ public void testFakeToToRedisDeleteZSetTest() throws IOException {
+ String key = "zset_check";
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.ZSET, key, new
HashMap<>()),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ Assertions.assertEquals(2, jedis.zcard(key));
+ jedis.del(key);
+ }
+
+ private ReadonlyConfig getDefaultReadonlyConfig(
+ RedisDataType dataType, String key, Map<String, Object>
otherParams) {
+ Map<String, Object> map = new HashMap<>(otherParams);
+ map.put("host", redisContainer.getHost());
+ map.put("port", redisContainer.getFirstMappedPort());
+ map.put("db_num", 0);
+ map.put("auth", password);
+ map.put("key", key);
+ map.put("data_type", dataType.name());
+ map.put("batch_size", 33);
+ return ReadonlyConfig.fromMap(map);
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert1() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert2() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert3() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 3,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 2,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRow getSeaTunnelRowDelete() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.DELETE);
+ return seaTunnelRow;
+ }
+
+ private CatalogTable getCatalogTable(Integer dbNum, String key) {
+ return CatalogTable.of(
+ TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
+ getTableSchema(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "");
+ }
+
+ private TableSchema getTableSchema() {
+ return new TableSchema(getColumns(), null, null);
+ }
+
+ private List<Column> getColumns() {
+ List<Column> columns = new ArrayList<>();
+ columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true,
"", ""));
+ columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0,
true, "", ""));
+ columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE,
64L, 0, true, "", ""));
+ columns.add(
+ new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L,
1, true, "", ""));
+ columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE,
0L, 0, true, "", ""));
+ columns.add(
+ new PhysicalColumn(
+ "val_unixtime_micros",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 64L,
+ 6,
+ true,
+ "",
+ ""));
+ return columns;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
index 6df6e80d6e..b20700577e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
@@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.e2e.connector.redis;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
public class Redis5IT extends RedisTestCaseTemplateIT {
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
index dfa46e886a..e6fb8a7989 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
@@ -16,6 +16,8 @@
*/
package org.apache.seatunnel.e2e.connector.redis;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
public class Redis7IT extends RedisTestCaseTemplateIT {
@Override
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
index b762ad60dc..fb4139f303 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
index 9c84bb3bf8..48ba3163f9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
@@ -16,6 +16,7 @@
*/
package org.apache.seatunnel.e2e.connector.redis;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 9a84ac7734..0124681abb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -35,9 +35,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
-import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -48,7 +47,6 @@ import
org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -623,73 +621,6 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("custom-hash-check");
}
- @Test
- public void testFakeToRedisDeleteHashTest() throws IOException {
- String key = "hash_check";
- SinkFlowTestUtils.runBatchWithCheckpointDisabled(
- getCatalogTable(0, key),
- getReadonlyConfig(RedisDataType.HASH, key),
- new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
- Assertions.assertEquals(2, jedis.hlen(key));
- jedis.del(key);
- }
-
- @TestTemplate
- public void testFakeToRedisDeleteKeyTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob("/fake-to-redis-test-delete-key.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- int count = 0;
- for (int i = 1; i <= 3; i++) {
- String data = jedis.get("key_check:" + i);
- if (data != null) {
- count++;
- }
- }
- Assertions.assertEquals(2, count);
- for (int i = 1; i <= 3; i++) {
- jedis.del("key_check:" + i);
- }
- }
-
- @TestTemplate
- public void testFakeToRedisDeleteListTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob("/fake-to-redis-test-delete-list.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.llen("list_check"));
- jedis.del("list_check");
- }
-
- @TestTemplate
- public void testFakeToRedisDeleteSetTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob("/fake-to-redis-test-delete-set.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.scard("set_check"));
- jedis.del("set_check");
- }
-
- @TestTemplate
- public void testFakeToToRedisDeleteZSetTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob("/fake-to-redis-test-delete-zset.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.zcard("zset_check"));
- jedis.del("zset_check");
- }
-
@TestTemplate
@DisabledOnContainer(
value = {},
@@ -831,15 +762,15 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
public abstract RedisContainerInfo getRedisContainerInfo();
- private ReadonlyConfig getReadonlyConfig(RedisDataType dataType, String
key) {
- Map<String, Object> map = new HashMap<>();
+ private ReadonlyConfig getDefaultReadonlyConfig(
+ RedisDataType dataType, String key, Map<String, Object>
otherParams) {
+ Map<String, Object> map = new HashMap<>(otherParams);
map.put("host", redisContainer.getHost());
map.put("port", redisContainer.getFirstMappedPort());
map.put("db_num", 0);
map.put("auth", password);
map.put("key", key);
map.put("data_type", dataType.name());
- map.put("hash_key_field", "id");
map.put("batch_size", 33);
return ReadonlyConfig.fromMap(map);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
deleted file mode 100644
index 5be915889e..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "key_check:{id}"
- data_type = key
- support_custom_key = true
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
deleted file mode 100644
index 55deb18754..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "list_check"
- data_type = list
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
deleted file mode 100644
index bd1c71128e..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "set_check"
- data_type = set
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
deleted file mode 100644
index cf80d3b00c..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "zset_check"
- data_type = zset
- batch_size = 33
- }
-}
\ No newline at end of file