This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 42afe4830d [Hotfix][e2e] Fix e2e error (#6018)
42afe4830d is described below
commit 42afe4830d6d975804f0b6d7c0bd43af5132e415
Author: hailin0 <[email protected]>
AuthorDate: Sat Dec 16 19:22:53 2023 +0800
[Hotfix][e2e] Fix e2e error (#6018)
---
.../e2e/connector/kafka/KafkaFormatIT.java | 210 +++++++++++----------
1 file changed, 108 insertions(+), 102 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index d9b88ad77a..bf09aef14b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.e2e.connector.kafka;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -117,111 +119,115 @@ public class KafkaFormatIT extends TestSuiteBase
implements TestResource {
private static final String PG_SINK_TABLE1 = "sink";
private static final String PG_SINK_TABLE2 = "sink2";
- private static final Map<String, SeaTunnelRowType> sinkTableRowTypes = new
HashMap<>();
+ private static final Map<String, CatalogTable> sinkTables = new
HashMap<>();
static {
- sinkTableRowTypes.put(
+ sinkTables.put(
PG_SINK_TABLE1,
- new SeaTunnelRowType(
- new String[] {"id", "name", "description", "weight"},
- new SeaTunnelDataType[] {
- BasicType.INT_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE
- }));
-
- sinkTableRowTypes.put(
+ CatalogTableUtil.getCatalogTable(
+ PG_SINK_TABLE1,
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "description",
"weight"},
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE
+ })));
+
+ sinkTables.put(
PG_SINK_TABLE2,
- new SeaTunnelRowType(
- new String[] {
- "id",
- "f_binary",
- "f_blob",
- "f_long_varbinary",
- "f_longblob",
- "f_tinyblob",
- "f_varbinary",
- "f_smallint",
- "f_smallint_unsigned",
- "f_mediumint",
- "f_mediumint_unsigned",
- "f_int",
- "f_int_unsigned",
- "f_integer",
- "f_integer_unsigned",
- "f_bigint",
- "f_bigint_unsigned",
- "f_numeric",
- "f_decimal",
- "f_float",
- "f_double",
- "f_double_precision",
- "f_longtext",
- "f_mediumtext",
- "f_text",
- "f_tinytext",
- "f_varchar",
- "f_date",
- "f_datetime",
- "f_timestamp",
- "f_bit1",
- "f_bit64",
- "f_char",
- "f_enum",
- "f_mediumblob",
- "f_long_varchar",
- "f_real",
- "f_time",
- "f_tinyint",
- "f_tinyint_unsigned",
- "f_json",
- "f_year"
- },
- new SeaTunnelDataType[] {
- BasicType.INT_TYPE,
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveByteArrayType.INSTANCE,
- PrimitiveByteArrayType.INSTANCE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.INT_TYPE,
- BasicType.INT_TYPE,
- BasicType.INT_TYPE,
- BasicType.INT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.LONG_TYPE,
- new DecimalType(10, 0),
- new DecimalType(10, 0),
- new DecimalType(10, 0),
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- BasicType.DOUBLE_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.STRING_TYPE,
- BasicType.STRING_TYPE,
- PrimitiveByteArrayType.INSTANCE,
- BasicType.STRING_TYPE,
- BasicType.DOUBLE_TYPE,
- LocalTimeType.LOCAL_TIME_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.INT_TYPE,
- BasicType.STRING_TYPE,
- BasicType.INT_TYPE
- }));
+ CatalogTableUtil.getCatalogTable(
+ PG_SINK_TABLE2,
+ new SeaTunnelRowType(
+ new String[] {
+ "id",
+ "f_binary",
+ "f_blob",
+ "f_long_varbinary",
+ "f_longblob",
+ "f_tinyblob",
+ "f_varbinary",
+ "f_smallint",
+ "f_smallint_unsigned",
+ "f_mediumint",
+ "f_mediumint_unsigned",
+ "f_int",
+ "f_int_unsigned",
+ "f_integer",
+ "f_integer_unsigned",
+ "f_bigint",
+ "f_bigint_unsigned",
+ "f_numeric",
+ "f_decimal",
+ "f_float",
+ "f_double",
+ "f_double_precision",
+ "f_longtext",
+ "f_mediumtext",
+ "f_text",
+ "f_tinytext",
+ "f_varchar",
+ "f_date",
+ "f_datetime",
+ "f_timestamp",
+ "f_bit1",
+ "f_bit64",
+ "f_char",
+ "f_enum",
+ "f_mediumblob",
+ "f_long_varchar",
+ "f_real",
+ "f_time",
+ "f_tinyint",
+ "f_tinyint_unsigned",
+ "f_json",
+ "f_year"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ PrimitiveByteArrayType.INSTANCE,
+ PrimitiveByteArrayType.INSTANCE,
+ PrimitiveByteArrayType.INSTANCE,
+ PrimitiveByteArrayType.INSTANCE,
+ PrimitiveByteArrayType.INSTANCE,
+ PrimitiveByteArrayType.INSTANCE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.LONG_TYPE,
+ new DecimalType(10, 0),
+ new DecimalType(10, 0),
+ new DecimalType(10, 0),
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.STRING_TYPE,
+ PrimitiveByteArrayType.INSTANCE,
+ BasicType.STRING_TYPE,
+ BasicType.DOUBLE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.INT_TYPE
+ })));
}
// Used to map local data paths to kafa topics that need to be written to
kafka
@@ -839,7 +845,7 @@ public class KafkaFormatIT extends TestSuiteBase implements
TestResource {
while (resultSet.next()) {
SeaTunnelRow row =
postgresJdbcRowConverter.toInternal(
- resultSet,
sinkTableRowTypes.get(tableName));
+ resultSet,
sinkTables.get(tableName).getTableSchema());
actual.add(Arrays.asList(row.getFields()));
}
}