This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 33e91031b0 [Improve][E2E] Improve redis test for delete hash (#9946)
33e91031b0 is described below

commit 33e91031b033fdcf21a5162d5e82b40d785193b8
Author: 老王 <[email protected]>
AuthorDate: Mon Nov 3 16:30:13 2025 +0800

    [Improve][E2E] Improve redis test for delete hash (#9946)
---
 .../connector-redis-e2e/pom.xml                    |   8 +
 .../connector/redis/RedisTestCaseTemplateIT.java   | 199 ++++++++++++++++++++-
 .../resources/fake-to-redis-test-delete-hash.conf  |  87 ---------
 3 files changed, 199 insertions(+), 95 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
index 5f3283e805..f6916a76ff 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/pom.xml
@@ -33,5 +33,13 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 274aab61ee..9a84ac7734 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -18,16 +18,26 @@ package org.apache.seatunnel.e2e.connector.redis;
 
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -38,6 +48,7 @@ import 
org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -72,6 +83,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
 import static org.awaitility.Awaitility.await;
 
 @Slf4j
@@ -104,6 +116,7 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
                         .waitingFor(
                                 new HostPortWaitStrategy()
                                         
.withStartupTimeout(Duration.ofMinutes(2)));
+
         Startables.deepStart(Stream.of(redisContainer)).join();
         log.info("Redis container started");
         this.initJedis();
@@ -610,14 +623,22 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.del("custom-hash-check");
     }
 
-    @TestTemplate
-    public void testFakeToRedisDeleteHashTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob("/fake-to-redis-test-delete-hash.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.hlen("hash_check"));
-        jedis.del("hash_check");
+    @Test
+    public void testFakeToRedisDeleteHashTest() throws IOException {
+        String key = "hash_check";
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getReadonlyConfig(RedisDataType.HASH, key),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        Assertions.assertEquals(2, jedis.hlen(key));
+        jedis.del(key);
     }
 
     @TestTemplate
@@ -809,4 +830,166 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
     }
 
     public abstract RedisContainerInfo getRedisContainerInfo();
+
+    private ReadonlyConfig getReadonlyConfig(RedisDataType dataType, String 
key) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", redisContainer.getHost());
+        map.put("port", redisContainer.getFirstMappedPort());
+        map.put("db_num", 0);
+        map.put("auth", password);
+        map.put("key", key);
+        map.put("data_type", dataType.name());
+        map.put("hash_key_field", "id");
+        map.put("batch_size", 33);
+        return ReadonlyConfig.fromMap(map);
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert1() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    1,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert2() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    2,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert3() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    3,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 2,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRow getSeaTunnelRowDelete() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            2,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.DELETE);
+        return seaTunnelRow;
+    }
+
+    private CatalogTable getCatalogTable(Integer dbNum, String key) {
+        return CatalogTable.of(
+                TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
+                getTableSchema(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                "");
+    }
+
+    private TableSchema getTableSchema() {
+        return new TableSchema(getColumns(), null, null);
+    }
+
+    private List<Column> getColumns() {
+        List<Column> columns = new ArrayList<>();
+        columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true, 
"", ""));
+        columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0, 
true, "", ""));
+        columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE, 
64L, 0, true, "", ""));
+        columns.add(
+                new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L, 
1, true, "", ""));
+        columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE, 
0L, 0, true, "", ""));
+        columns.add(
+                new PhysicalColumn(
+                        "val_unixtime_micros",
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                        64L,
+                        6,
+                        true,
+                        "",
+                        ""));
+        return columns;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
deleted file mode 100644
index cffd866916..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-hash.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "hash_check"
-    data_type = hash
-    hash_key_field = "id"
-    batch_size = 33
-  }
-}
\ No newline at end of file

Reply via email to