This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 42afe4830d [Hotfix][e2e] Fix e2e error (#6018)
42afe4830d is described below

commit 42afe4830d6d975804f0b6d7c0bd43af5132e415
Author: hailin0 <[email protected]>
AuthorDate: Sat Dec 16 19:22:53 2023 +0800

    [Hotfix][e2e] Fix e2e error (#6018)
---
 .../e2e/connector/kafka/KafkaFormatIT.java         | 210 +++++++++++----------
 1 file changed, 108 insertions(+), 102 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index d9b88ad77a..bf09aef14b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -18,6 +18,8 @@
 
 package org.apache.seatunnel.e2e.connector.kafka;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -117,111 +119,115 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
     private static final String PG_SINK_TABLE1 = "sink";
     private static final String PG_SINK_TABLE2 = "sink2";
 
-    private static final Map<String, SeaTunnelRowType> sinkTableRowTypes = new 
HashMap<>();
+    private static final Map<String, CatalogTable> sinkTables = new 
HashMap<>();
 
     static {
-        sinkTableRowTypes.put(
+        sinkTables.put(
                 PG_SINK_TABLE1,
-                new SeaTunnelRowType(
-                        new String[] {"id", "name", "description", "weight"},
-                        new SeaTunnelDataType[] {
-                            BasicType.INT_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE
-                        }));
-
-        sinkTableRowTypes.put(
+                CatalogTableUtil.getCatalogTable(
+                        PG_SINK_TABLE1,
+                        new SeaTunnelRowType(
+                                new String[] {"id", "name", "description", 
"weight"},
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE
+                                })));
+
+        sinkTables.put(
                 PG_SINK_TABLE2,
-                new SeaTunnelRowType(
-                        new String[] {
-                            "id",
-                            "f_binary",
-                            "f_blob",
-                            "f_long_varbinary",
-                            "f_longblob",
-                            "f_tinyblob",
-                            "f_varbinary",
-                            "f_smallint",
-                            "f_smallint_unsigned",
-                            "f_mediumint",
-                            "f_mediumint_unsigned",
-                            "f_int",
-                            "f_int_unsigned",
-                            "f_integer",
-                            "f_integer_unsigned",
-                            "f_bigint",
-                            "f_bigint_unsigned",
-                            "f_numeric",
-                            "f_decimal",
-                            "f_float",
-                            "f_double",
-                            "f_double_precision",
-                            "f_longtext",
-                            "f_mediumtext",
-                            "f_text",
-                            "f_tinytext",
-                            "f_varchar",
-                            "f_date",
-                            "f_datetime",
-                            "f_timestamp",
-                            "f_bit1",
-                            "f_bit64",
-                            "f_char",
-                            "f_enum",
-                            "f_mediumblob",
-                            "f_long_varchar",
-                            "f_real",
-                            "f_time",
-                            "f_tinyint",
-                            "f_tinyint_unsigned",
-                            "f_json",
-                            "f_year"
-                        },
-                        new SeaTunnelDataType[] {
-                            BasicType.INT_TYPE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            BasicType.SHORT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.LONG_TYPE,
-                            BasicType.LONG_TYPE,
-                            new DecimalType(10, 0),
-                            new DecimalType(10, 0),
-                            new DecimalType(10, 0),
-                            BasicType.FLOAT_TYPE,
-                            BasicType.DOUBLE_TYPE,
-                            BasicType.DOUBLE_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            LocalTimeType.LOCAL_DATE_TYPE,
-                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                            BasicType.BOOLEAN_TYPE,
-                            BasicType.BYTE_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.STRING_TYPE,
-                            PrimitiveByteArrayType.INSTANCE,
-                            BasicType.STRING_TYPE,
-                            BasicType.DOUBLE_TYPE,
-                            LocalTimeType.LOCAL_TIME_TYPE,
-                            BasicType.BYTE_TYPE,
-                            BasicType.INT_TYPE,
-                            BasicType.STRING_TYPE,
-                            BasicType.INT_TYPE
-                        }));
+                CatalogTableUtil.getCatalogTable(
+                        PG_SINK_TABLE2,
+                        new SeaTunnelRowType(
+                                new String[] {
+                                    "id",
+                                    "f_binary",
+                                    "f_blob",
+                                    "f_long_varbinary",
+                                    "f_longblob",
+                                    "f_tinyblob",
+                                    "f_varbinary",
+                                    "f_smallint",
+                                    "f_smallint_unsigned",
+                                    "f_mediumint",
+                                    "f_mediumint_unsigned",
+                                    "f_int",
+                                    "f_int_unsigned",
+                                    "f_integer",
+                                    "f_integer_unsigned",
+                                    "f_bigint",
+                                    "f_bigint_unsigned",
+                                    "f_numeric",
+                                    "f_decimal",
+                                    "f_float",
+                                    "f_double",
+                                    "f_double_precision",
+                                    "f_longtext",
+                                    "f_mediumtext",
+                                    "f_text",
+                                    "f_tinytext",
+                                    "f_varchar",
+                                    "f_date",
+                                    "f_datetime",
+                                    "f_timestamp",
+                                    "f_bit1",
+                                    "f_bit64",
+                                    "f_char",
+                                    "f_enum",
+                                    "f_mediumblob",
+                                    "f_long_varchar",
+                                    "f_real",
+                                    "f_time",
+                                    "f_tinyint",
+                                    "f_tinyint_unsigned",
+                                    "f_json",
+                                    "f_year"
+                                },
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    BasicType.SHORT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.LONG_TYPE,
+                                    BasicType.LONG_TYPE,
+                                    new DecimalType(10, 0),
+                                    new DecimalType(10, 0),
+                                    new DecimalType(10, 0),
+                                    BasicType.FLOAT_TYPE,
+                                    BasicType.DOUBLE_TYPE,
+                                    BasicType.DOUBLE_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    LocalTimeType.LOCAL_DATE_TYPE,
+                                    LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                    LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                    BasicType.BOOLEAN_TYPE,
+                                    BasicType.BYTE_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    PrimitiveByteArrayType.INSTANCE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.DOUBLE_TYPE,
+                                    LocalTimeType.LOCAL_TIME_TYPE,
+                                    BasicType.BYTE_TYPE,
+                                    BasicType.INT_TYPE,
+                                    BasicType.STRING_TYPE,
+                                    BasicType.INT_TYPE
+                                })));
     }
 
     // Used to map local data paths to kafa topics that need to be written to 
kafka
@@ -839,7 +845,7 @@ public class KafkaFormatIT extends TestSuiteBase implements 
TestResource {
                 while (resultSet.next()) {
                     SeaTunnelRow row =
                             postgresJdbcRowConverter.toInternal(
-                                    resultSet, 
sinkTableRowTypes.get(tableName));
+                                    resultSet, 
sinkTables.get(tableName).getTableSchema());
                     actual.add(Arrays.asList(row.getFields()));
                 }
             }

Reply via email to