This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eb54fdd52c [Fix][Paimon] nullable and comment attribute was lost
during automatic table creation (#9020)
eb54fdd52c is described below
commit eb54fdd52c82a146ad2ae1741cff541fd39ed29c
Author: zhangdonghao <[email protected]>
AuthorDate: Mon Mar 24 10:59:11 2025 +0800
[Fix][Paimon] nullable and comment attribute was lost during automatic
table creation (#9020)
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 18 ++-
.../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 15 +++
.../jdbc/catalog/utils/CatalogUtilsTest.java | 2 +-
.../seatunnel/paimon/catalog/PaimonCatalog.java | 19 ++-
.../AlterPaimonTableSchemaEventHandler.java | 18 ++-
.../seatunnel/paimon/utils/RowTypeConverter.java | 8 +-
.../seatunnel/paimon/utils/SchemaUtil.java | 13 +-
.../paimon/catalog/PaimonWithCommentTest.java | 140 +++++++++++++++++++++
.../paimon/utils/RowTypeConverterTest.java | 24 +++-
.../e2e/connector/paimon/AbstractPaimonIT.java | 5 +-
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 2 -
.../paimon/PaimonSinkWithSchemaEvolutionIT.java | 2 +
12 files changed, 244 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index a1d82cbe2f..bf94fc811b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -211,6 +211,7 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
Connection conn = getConnection(dbUrl);
try {
DatabaseMetaData metaData = conn.getMetaData();
+ Optional<String> comment = getTableComment(metaData, tablePath);
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath);
List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath);
TableSchema.Builder tableSchemaBuilder =
@@ -225,7 +226,7 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
tableSchemaBuilder.build(),
buildConnectorOptions(tablePath),
Collections.emptyList(),
- "",
+ comment.orElse(""),
catalogName);
} catch (SeaTunnelRuntimeException e) {
@@ -283,6 +284,21 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
return CatalogUtils.getPrimaryKey(metaData, TablePath.of(database,
schema, table));
}
+ protected Optional<String> getTableComment(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ return getTableComment(
+ metaData,
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName());
+ }
+
+ protected Optional<String> getTableComment(
+ DatabaseMetaData metaData, String database, String schema, String
table)
+ throws SQLException {
+ return CatalogUtils.getTableComment(metaData, TablePath.of(database,
schema, table));
+ }
+
protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData,
TablePath tablePath)
throws SQLException {
return getConstraintKeys(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index 070ef670af..d6f3c4c975 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -101,6 +101,21 @@ public class CatalogUtils {
return getFieldIde(identifier, fieldIde);
}
+ public static Optional<String> getTableComment(DatabaseMetaData metaData,
TablePath tablePath)
+ throws SQLException {
+ try (ResultSet rs =
+ metaData.getTables(
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ new String[] {"TABLE"})) {
+ if (rs.next()) {
+ return Optional.ofNullable(rs.getString("REMARKS"));
+ }
+ }
+ return Optional.empty();
+ }
+
public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData
metaData, TablePath tablePath)
throws SQLException {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
index 7fb8741f05..f4e132e734 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -52,7 +52,7 @@ public class CatalogUtilsTest {
}
@Test
- void testGetCommentWithJdbcDialectTypeMapper() throws SQLException {
+ void testGetTableCommentWithJdbcDialectTypeMapper() throws SQLException {
TableSchema tableSchema =
CatalogUtils.getTableSchema(
new TestDatabaseMetaData(),
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index a14a250cf2..00d162a237 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -163,7 +163,9 @@ public class PaimonCatalog implements Catalog, PaimonTable {
try {
Schema paimonSchema =
SchemaUtil.toPaimonSchema(
- table.getTableSchema(), new
PaimonSinkConfig(readonlyConfig));
+ table.getTableSchema(),
+ new PaimonSinkConfig(readonlyConfig),
+ table.getComment());
catalog.createTable(toIdentifier(tablePath), paimonSchema,
ignoreIfExists);
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException
e) {
throw new TableAlreadyExistException(this.catalogName, tablePath);
@@ -276,7 +278,7 @@ public class PaimonCatalog implements Catalog, PaimonTable {
builder.build(),
paimonFileStoreTableTable.options(),
partitionKeys,
- null,
+ paimonFileStoreTableTable.comment().orElse(null),
catalogName);
}
@@ -343,4 +345,17 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
throw new CatalogException("ColumnNotExistException: {}", e);
}
}
+
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> schemaChanges, boolean
ignoreIfNotExists) {
+ try {
+ catalog.alterTable(identifier, schemaChanges, true);
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new CatalogException("TableNotExistException: {}", e);
+ } catch (org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException
e) {
+ throw new CatalogException("ColumnAlreadyExistException: {}", e);
+ } catch (org.apache.paimon.catalog.Catalog.ColumnNotExistException e) {
+ throw new CatalogException("ColumnNotExistException: {}", e);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
index 1872641c1e..687103474a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
@@ -41,6 +41,9 @@ import org.apache.paimon.utils.Preconditions;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
+import java.util.List;
+
import static
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.UpdatedDataFields.canConvert;
@Slf4j
@@ -95,13 +98,16 @@ public class AlterPaimonTableSchemaEventHandler {
? null
: SchemaChange.Move.after(column.getName(),
afterColumnName);
BasicTypeDefine<DataType> reconvertColumn =
PaimonTypeMapper.INSTANCE.reconvert(column);
- SchemaChange schemaChange =
+ DataType nativeType = reconvertColumn.getNativeType();
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+ schemaChanges.add(
SchemaChange.addColumn(
- column.getName(),
- reconvertColumn.getNativeType(),
- column.getComment(),
- move);
- paimonCatalog.alterTable(identifier, schemaChange, false);
+ column.getName(), nativeType.copy(true),
column.getComment(), move));
+ if (!nativeType.isNullable()) {
+ schemaChanges.add(
+ SchemaChange.updateColumnType(column.getName(),
nativeType.copy(false)));
+ }
+ paimonCatalog.alterTable(identifier, schemaChanges, false);
} else if (event instanceof AlterTableDropColumnEvent) {
String columnName = ((AlterTableDropColumnEvent)
event).getColumn();
paimonCatalog.alterTable(identifier,
SchemaChange.dropColumn(columnName), true);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index ca5a87726f..20dedb7e9a 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -284,7 +284,7 @@ public class RowTypeConverter {
int timestampScale =
Objects.isNull(scale) ?
TimestampType.DEFAULT_PRECISION : scale;
TimestampType timestampType =
DataTypes.TIMESTAMP(timestampScale);
- builder.nativeType(timestampType);
+
builder.nativeType(timestampType.copy(column.isNullable()));
builder.dataType(timestampType.getTypeRoot().name());
builder.columnType(timestampType.toString());
builder.scale(timestampScale);
@@ -293,7 +293,7 @@ public class RowTypeConverter {
case TIME:
int timeScale = Objects.isNull(scale) ?
TimeType.DEFAULT_PRECISION : scale;
TimeType timeType = DataTypes.TIME(timeScale);
- builder.nativeType(timeType);
+ builder.nativeType(timeType.copy(column.isNullable()));
builder.columnType(timeType.toString());
builder.dataType(timeType.getTypeRoot().name());
builder.scale(timeScale);
@@ -356,7 +356,7 @@ public class RowTypeConverter {
}
DecimalType paimonDecimalType =
DataTypes.DECIMAL(precision, scale);
- builder.nativeType(paimonDecimalType);
+
builder.nativeType(paimonDecimalType.copy(column.isNullable()));
builder.columnType(paimonDecimalType.toString());
builder.dataType(paimonDecimalType.getTypeRoot().name());
builder.scale(scale);
@@ -364,7 +364,7 @@ public class RowTypeConverter {
builder.length(column.getColumnLength());
return builder.build();
default:
- builder.nativeType(visit(column.getName(), dataType));
+ builder.nativeType(visit(column.getName(),
dataType).copy(column.isNullable()));
builder.columnType(dataType.toString());
builder.length(column.getColumnLength());
builder.dataType(dataType.getSqlType().name());
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index ca825a269f..84f936a584 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.shade.org.apache.commons.lang.StringUtils;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -44,11 +45,16 @@ public class SchemaUtil {
}
public static Schema toPaimonSchema(
- TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) {
+ TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig, String
comment) {
Schema.Builder paiSchemaBuilder = Schema.newBuilder();
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumns().get(i);
- paiSchemaBuilder.column(column.getName(), toPaimonType(column));
+ if (StringUtils.isNotBlank(column.getComment())) {
+ paiSchemaBuilder.column(
+ column.getName(), toPaimonType(column),
column.getComment());
+ } else {
+ paiSchemaBuilder.column(column.getName(),
toPaimonType(column));
+ }
}
List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys();
if (primaryKeys.isEmpty() &&
Objects.nonNull(tableSchema.getPrimaryKey())) {
@@ -69,6 +75,9 @@ public class SchemaUtil {
if (!writeProps.isEmpty()) {
paiSchemaBuilder.options(writeProps);
}
+ if (StringUtils.isNotBlank(comment)) {
+ paiSchemaBuilder.comment(comment);
+ }
return paiSchemaBuilder.build();
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
new file mode 100644
index 0000000000..e84f15258b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonWithCommentTest {
+
+ private PaimonCatalog paimonCatalog;
+ private TableSchema.Builder schemaBuilder;
+ private final String CATALOG_NAME = "paimon_catalog";
+ private final String DATABASE_NAME = "default";
+ private final String TABLE_NAME = "test_with_comment";
+ private final String warehousePath = "/tmp/paimon";
+ private Catalog catalog;
+
+ @BeforeEach
+ public void before() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("warehouse", warehousePath);
+ properties.put("plugin_name", "Paimon");
+ properties.put("database", DATABASE_NAME);
+ properties.put("table", TABLE_NAME);
+ Map<String, String> writeProps = new HashMap<>();
+ writeProps.put("bucket", "1");
+ properties.put("paimon.table.write-props", writeProps);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+ CatalogContext catalogContext = CatalogContext.create(new
Path(warehousePath));
+ catalog = CatalogFactory.createCatalog(catalogContext);
+ paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+ paimonCatalog.open();
+ paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME),
true);
+ this.schemaBuilder =
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "c_string",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ "c_string"))
+ .column(
+ PhysicalColumn.of(
+ "c_int",
+ BasicType.INT_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_int"))
+ .column(
+ PhysicalColumn.of(
+ "c_bigint",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_bigint"));
+ }
+
+ @Test
+ public void testCreateTableWithCommentAndNullable() throws
Catalog.TableNotExistException {
+ TableSchema tableSchema =
+ schemaBuilder
+ .primaryKey(PrimaryKey.of("pk",
Collections.singletonList("c_int")))
+ .build();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(CATALOG_NAME, DATABASE_NAME,
TABLE_NAME),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "test table");
+ paimonCatalog.createTable(
+ TablePath.of(DATABASE_NAME, null, TABLE_NAME), catalogTable,
true);
+
+ FileStoreTable table =
+ (FileStoreTable)
catalog.getTable(Identifier.create(DATABASE_NAME, TABLE_NAME));
+ Assertions.assertEquals("test table", table.comment().get());
+ table.schema()
+ .fields()
+ .forEach(
+ field -> {
+ Assertions.assertEquals(field.name(),
field.description());
+ if (field.name().equals("c_string")) {
+
Assertions.assertTrue(field.type().isNullable());
+ } else {
+
Assertions.assertFalse(field.type().isNullable());
+ }
+ });
+ }
+
+ @AfterEach
+ public void after() {
+ paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME),
false);
+ paimonCatalog.close();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index fe1214929b..0d29320768 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -54,6 +54,8 @@ public class RowTypeConverterTest {
private Column column;
+ private Column columnNotNull;
+
private TableSchema tableSchema;
public static final RowType DEFAULT_ROW_TYPE =
@@ -187,15 +189,27 @@ public class RowTypeConverterTest {
column =
PhysicalColumn.builder()
- .name("c_decimal")
+ .name("c_decimal_null")
.sourceType(DataTypes.DECIMAL(30, 8).toString())
- .nullable(false)
+ .nullable(true)
.dataType(dataType)
.columnLength(30L)
.defaultValue(3.0)
.scale(8)
.comment("c_decimal_type_define")
.build();
+
+ columnNotNull =
+ PhysicalColumn.builder()
+ .name("c_decimal_not_null")
+ .sourceType(DataTypes.DECIMAL(30, 8).toString())
+ .nullable(false)
+ .dataType(dataType)
+ .columnLength(30L)
+ .defaultValue(3.0)
+ .scale(8)
+ .comment("c_decimal_not_null")
+ .build();
}
@Test
@@ -227,6 +241,12 @@ public class RowTypeConverterTest {
public void seaTunnelColumnToPaimonDataType() {
BasicTypeDefine<DataType> dataTypeDefine =
RowTypeConverter.reconvert(column);
isEquals(column, dataTypeDefine);
+ Assertions.assertTrue(dataTypeDefine.isNullable());
+ Assertions.assertTrue(dataTypeDefine.getNativeType().isNullable());
+ BasicTypeDefine<DataType> dataTypeDefineNotNull =
RowTypeConverter.reconvert(columnNotNull);
+ isEquals(columnNotNull, dataTypeDefineNotNull);
+ Assertions.assertFalse(dataTypeDefineNotNull.isNullable());
+
Assertions.assertFalse(dataTypeDefineNotNull.getNativeType().isNullable());
}
private void isEquals(Column column, BasicTypeDefine<DataType>
dataTypeDefine) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
index 98323e9df1..d5dd8608a5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
@@ -47,8 +47,9 @@ public abstract class AbstractPaimonIT extends TestSuiteBase {
protected static final String FAKE_DATABASE1 = "FakeDatabase1";
protected static final String FAKE_TABLE2 = "FakeTable1";
protected static final String FAKE_DATABASE2 = "FakeDatabase2";
- protected String CATALOG_ROOT_DIR_WIN = "C:/Users/";
- protected String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+ private final String CATALOG_ROOT_DIR_WIN =
+ "C:/Users/" + System.getProperty("user.name") + "/tmp/";
+ private final String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE +
"/";
protected boolean isWindows;
protected boolean changeLogEnabled = false;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index cb947fba9e..9e2af9bd4e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -69,8 +69,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
public void startUp() throws Exception {
this.isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
- CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN +
System.getProperty("user.name") + "/tmp/";
- CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
}
@AfterAll
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index d82ef45278..5c85b29863 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -134,6 +134,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends
AbstractPaimonIT implements
@BeforeAll
@Override
public void startUp() throws Exception {
+ this.isWindows =
+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
log.info("The second stage: Starting Mysql containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
log.info("Mysql Containers are started");