This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 1d6a6d0d5a7f0e933c80db7eab1da0b0cefcb505 Author: tsreaper <[email protected]> AuthorDate: Wed Apr 26 12:24:32 2023 +0800 [flink][bug] MySQL datetime and timestamp default precision should be 0 (#1028) --- .../flink/action/cdc/mysql/MySqlTypeUtils.java | 9 +++- .../action/cdc/mysql/MySqlActionITCaseBase.java | 22 +++++----- .../cdc/mysql/MySqlSyncTableActionITCase.java | 50 ++++++++++++++++++++-- 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java index 4c9a298de..1a014aeb7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java @@ -171,7 +171,9 @@ public class MySqlTypeUtils { case DATETIME: case TIMESTAMP: if (length == null) { - return DataTypes.TIMESTAMP(); + // default precision is 0 + // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html + return DataTypes.TIMESTAMP(0); } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) { if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) { // Timestamp with a fraction of seconds. @@ -184,7 +186,10 @@ public class MySqlTypeUtils { } else if (length >= 0 && length <= TimestampType.MAX_PRECISION) { return DataTypes.TIMESTAMP(length); } else { - return DataTypes.TIMESTAMP(); + throw new UnsupportedOperationException( + "Unsupported length " + + length + + " for MySQL DATETIME and TIMESTAMP types"); } case CHAR: return DataTypes.CHAR(Preconditions.checkNotNull(length)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index d53203702..774e623d7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -83,17 +83,19 @@ public class MySqlActionITCaseBase extends ActionITCaseBase { // wait for table schema to become our expected schema while (true) { - int cnt = 0; - for (int i = 0; i < table.schema().fields().size(); i++) { - DataField field = table.schema().fields().get(i); - boolean sameName = field.name().equals(rowType.getFieldNames().get(i)); - boolean sameType = field.type().equals(rowType.getFieldTypes().get(i)); - if (sameName && sameType) { - cnt++; + if (rowType.getFieldCount() == table.schema().fields().size()) { + int cnt = 0; + for (int i = 0; i < table.schema().fields().size(); i++) { + DataField field = table.schema().fields().get(i); + boolean sameName = field.name().equals(rowType.getFieldNames().get(i)); + boolean sameType = field.type().equals(rowType.getFieldTypes().get(i)); + if (sameName && sameType) { + cnt++; + } + } + if (cnt == rowType.getFieldCount()) { + break; } - } - if (cnt == rowType.getFieldCount()) { - break; } table = table.copyWithLatestSchema(); Thread.sleep(1000); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index a623b3a75..99028fa5c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -23,7 +23,10 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -38,12 +41,14 @@ import org.junit.jupiter.api.Timeout; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -344,11 +349,11 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { // the first round checks for table creation // the second round checks for running the action on an existing table for (int i = 0; i < 2; i++) { - testAllTypesImpl(); + testAllTypesOnce(); } } - private void testAllTypesImpl() throws Exception { + private void testAllTypesOnce() throws Exception { Map<String, String> mySqlConfig = getBasicMySqlConfig(); mySqlConfig.put("database-name", DATABASE_NAME); mySqlConfig.put("table-name", "all_types_table"); @@ -369,8 +374,30 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { Collections.emptyMap(), Collections.emptyMap()); action.build(env); - JobClient jobClient = env.executeAsync(); + JobClient client = env.executeAsync(); + while (true) { + JobStatus status = client.getJobStatus().get(); + if (status == JobStatus.RUNNING) { + break; + } + Thread.sleep(1000); + } + + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword())) { + try (Statement statement = conn.createStatement()) { + testAllTypesImpl(statement); + } + } + + client.cancel().get(); + } + + private void testAllTypesImpl(Statement statement) throws Exception { RowType rowType = RowType.of( new DataType[] { @@ -574,7 +601,22 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { + "]"); waitForResult(expected, table, rowType, Arrays.asList("pt", "_id")); - jobClient.cancel().get(); + // test all types during schema evolution + try { + statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v INT"); + List<DataField> newFields = new ArrayList<>(rowType.getFields()); + newFields.add(new DataField(rowType.getFieldCount(), "v", DataTypes.INT())); + RowType newRowType = new RowType(newFields); + List<String> newExpected = + expected.stream() + .map(s -> s.substring(0, s.length() - 1) + ", NULL]") + .collect(Collectors.toList()); + waitForResult(newExpected, table, newRowType, Arrays.asList("pt", "_id")); + } finally { + statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v"); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + schemaManager.commitChanges(SchemaChange.dropColumn("v")); + } } @Test
