This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 33e91031b0 [Improve][E2E] Improve redis test for delete hash (#9946)
33e91031b0 is described below
commit 33e91031b033fdcf21a5162d5e82b40d785193b8
Author: 老王 <[email protected]>
AuthorDate: Mon Nov 3 16:30:13 2025 +0800
[Improve][E2E] Improve redis test for delete hash (#9946)
---
.../connector-redis-e2e/pom.xml | 8 +
.../connector/redis/RedisTestCaseTemplateIT.java | 199 ++++++++++++++++++++-
.../resources/fake-to-redis-test-delete-hash.conf | 87 ---------
3 files changed, 199 insertions(+), 95 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
index 5f3283e805..f6916a76ff 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
@@ -33,5 +33,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 274aab61ee..9a84ac7734 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -18,16 +18,26 @@ package org.apache.seatunnel.e2e.connector.redis;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -38,6 +48,7 @@ import
org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -72,6 +83,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
import static org.awaitility.Awaitility.await;
@Slf4j
@@ -104,6 +116,7 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
.waitingFor(
new HostPortWaitStrategy()
.withStartupTimeout(Duration.ofMinutes(2)));
+
Startables.deepStart(Stream.of(redisContainer)).join();
log.info("Redis container started");
this.initJedis();
@@ -610,14 +623,22 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
jedis.del("custom-hash-check");
}
- @TestTemplate
- public void testFakeToRedisDeleteHashTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob("/fake-to-redis-test-delete-hash.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.hlen("hash_check"));
- jedis.del("hash_check");
+ @Test
+ public void testFakeToRedisDeleteHashTest() throws IOException {
+ String key = "hash_check";
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getReadonlyConfig(RedisDataType.HASH, key),
+ new RedisSinkFactory(),
+ Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete()));
+ Assertions.assertEquals(2, jedis.hlen(key));
+ jedis.del(key);
}
@TestTemplate
@@ -809,4 +830,166 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
}
public abstract RedisContainerInfo getRedisContainerInfo();
+
+ private ReadonlyConfig getReadonlyConfig(RedisDataType dataType, String
key) {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", redisContainer.getHost());
+ map.put("port", redisContainer.getFirstMappedPort());
+ map.put("db_num", 0);
+ map.put("auth", password);
+ map.put("key", key);
+ map.put("data_type", dataType.name());
+ map.put("hash_key_field", "id");
+ map.put("batch_size", 33);
+ return ReadonlyConfig.fromMap(map);
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert1() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert2() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowInsert3() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 3,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 2,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+ return seaTunnelRow;
+ }
+
+ private SeaTunnelRow getSeaTunnelRowDelete() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.DELETE);
+ return seaTunnelRow;
+ }
+
+ private CatalogTable getCatalogTable(Integer dbNum, String key) {
+ return CatalogTable.of(
+ TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
+ getTableSchema(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ "");
+ }
+
+ private TableSchema getTableSchema() {
+ return new TableSchema(getColumns(), null, null);
+ }
+
+ private List<Column> getColumns() {
+ List<Column> columns = new ArrayList<>();
+ columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true,
"", ""));
+ columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0,
true, "", ""));
+ columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L,
0, true, "", ""));
+ columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE,
64L, 0, true, "", ""));
+ columns.add(
+ new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L,
1, true, "", ""));
+ columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE,
0L, 0, true, "", ""));
+ columns.add(
+ new PhysicalColumn(
+ "val_unixtime_micros",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 64L,
+ 6,
+ true,
+ "",
+ ""));
+ return columns;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
deleted file mode 100644
index cffd866916..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "hash_check"
- data_type = hash
- hash_key_field = "id"
- batch_size = 33
- }
-}
\ No newline at end of file