This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 356fbaca1 [flink][bug] MySQL datetime and timestamp default precision
should be 0 (#1028)
356fbaca1 is described below
commit 356fbaca11b78c3a22416e9bb23a656dd7c38193
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 26 12:24:32 2023 +0800
[flink][bug] MySQL datetime and timestamp default precision should be 0
(#1028)
---
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 9 +++-
.../action/cdc/mysql/MySqlActionITCaseBase.java | 22 +++++-----
.../cdc/mysql/MySqlSyncTableActionITCase.java | 50 ++++++++++++++++++++--
3 files changed, 65 insertions(+), 16 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 4c9a298de..1a014aeb7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -171,7 +171,9 @@ public class MySqlTypeUtils {
case DATETIME:
case TIMESTAMP:
if (length == null) {
- return DataTypes.TIMESTAMP();
+ // default precision is 0
+ // see
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+ return DataTypes.TIMESTAMP(0);
} else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
// Timestamp with a fraction of seconds.
@@ -184,7 +186,10 @@ public class MySqlTypeUtils {
} else if (length >= 0 && length <=
TimestampType.MAX_PRECISION) {
return DataTypes.TIMESTAMP(length);
} else {
- return DataTypes.TIMESTAMP();
+ throw new UnsupportedOperationException(
+ "Unsupported length "
+ + length
+ + " for MySQL DATETIME and TIMESTAMP
types");
}
case CHAR:
return DataTypes.CHAR(Preconditions.checkNotNull(length));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index d53203702..774e623d7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -83,17 +83,19 @@ public class MySqlActionITCaseBase extends ActionITCaseBase
{
// wait for table schema to become our expected schema
while (true) {
- int cnt = 0;
- for (int i = 0; i < table.schema().fields().size(); i++) {
- DataField field = table.schema().fields().get(i);
- boolean sameName =
field.name().equals(rowType.getFieldNames().get(i));
- boolean sameType =
field.type().equals(rowType.getFieldTypes().get(i));
- if (sameName && sameType) {
- cnt++;
+ if (rowType.getFieldCount() == table.schema().fields().size()) {
+ int cnt = 0;
+ for (int i = 0; i < table.schema().fields().size(); i++) {
+ DataField field = table.schema().fields().get(i);
+ boolean sameName =
field.name().equals(rowType.getFieldNames().get(i));
+ boolean sameType =
field.type().equals(rowType.getFieldTypes().get(i));
+ if (sameName && sameType) {
+ cnt++;
+ }
+ }
+ if (cnt == rowType.getFieldCount()) {
+ break;
}
- }
- if (cnt == rowType.getFieldCount()) {
- break;
}
table = table.copyWithLatestSchema();
Thread.sleep(1000);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index a623b3a75..99028fa5c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -23,7 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -38,12 +41,14 @@ import org.junit.jupiter.api.Timeout;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -344,11 +349,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
// the first round checks for table creation
// the second round checks for running the action on an existing table
for (int i = 0; i < 2; i++) {
- testAllTypesImpl();
+ testAllTypesOnce();
}
}
- private void testAllTypesImpl() throws Exception {
+ private void testAllTypesOnce() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "all_types_table");
@@ -369,8 +374,30 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
Collections.emptyMap(),
Collections.emptyMap());
action.build(env);
- JobClient jobClient = env.executeAsync();
+ JobClient client = env.executeAsync();
+ while (true) {
+ JobStatus status = client.getJobStatus().get();
+ if (status == JobStatus.RUNNING) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testAllTypesImpl(statement);
+ }
+ }
+
+ client.cancel().get();
+ }
+
+ private void testAllTypesImpl(Statement statement) throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
@@ -574,7 +601,22 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
+ "]");
waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
- jobClient.cancel().get();
+ // test all types during schema evolution
+ try {
+ statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v
INT");
+ List<DataField> newFields = new ArrayList<>(rowType.getFields());
+ newFields.add(new DataField(rowType.getFieldCount(), "v",
DataTypes.INT()));
+ RowType newRowType = new RowType(newFields);
+ List<String> newExpected =
+ expected.stream()
+ .map(s -> s.substring(0, s.length() - 1) + ",
NULL]")
+ .collect(Collectors.toList());
+ waitForResult(newExpected, table, newRowType, Arrays.asList("pt",
"_id"));
+ } finally {
+ statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN
v");
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ schemaManager.commitChanges(SchemaChange.dropColumn("v"));
+ }
}
@Test