This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 356fbaca1 [flink][bug] MySQL datetime and timestamp default precision 
should be 0 (#1028)
356fbaca1 is described below

commit 356fbaca11b78c3a22416e9bb23a656dd7c38193
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 26 12:24:32 2023 +0800

    [flink][bug] MySQL datetime and timestamp default precision should be 0 
(#1028)
---
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  9 +++-
 .../action/cdc/mysql/MySqlActionITCaseBase.java    | 22 +++++-----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 50 ++++++++++++++++++++--
 3 files changed, 65 insertions(+), 16 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index 4c9a298de..1a014aeb7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -171,7 +171,9 @@ public class MySqlTypeUtils {
             case DATETIME:
             case TIMESTAMP:
                 if (length == null) {
-                    return DataTypes.TIMESTAMP();
+                    // default precision is 0
+                    // see 
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+                    return DataTypes.TIMESTAMP(0);
                 } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
                     if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
                         // Timestamp with a fraction of seconds.
@@ -184,7 +186,10 @@ public class MySqlTypeUtils {
                 } else if (length >= 0 && length <= 
TimestampType.MAX_PRECISION) {
                     return DataTypes.TIMESTAMP(length);
                 } else {
-                    return DataTypes.TIMESTAMP();
+                    throw new UnsupportedOperationException(
+                            "Unsupported length "
+                                    + length
+                                    + " for MySQL DATETIME and TIMESTAMP 
types");
                 }
             case CHAR:
                 return DataTypes.CHAR(Preconditions.checkNotNull(length));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
index d53203702..774e623d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java
@@ -83,17 +83,19 @@ public class MySqlActionITCaseBase extends ActionITCaseBase 
{
 
         // wait for table schema to become our expected schema
         while (true) {
-            int cnt = 0;
-            for (int i = 0; i < table.schema().fields().size(); i++) {
-                DataField field = table.schema().fields().get(i);
-                boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
-                boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
-                if (sameName && sameType) {
-                    cnt++;
+            if (rowType.getFieldCount() == table.schema().fields().size()) {
+                int cnt = 0;
+                for (int i = 0; i < table.schema().fields().size(); i++) {
+                    DataField field = table.schema().fields().get(i);
+                    boolean sameName = 
field.name().equals(rowType.getFieldNames().get(i));
+                    boolean sameType = 
field.type().equals(rowType.getFieldTypes().get(i));
+                    if (sameName && sameType) {
+                        cnt++;
+                    }
+                }
+                if (cnt == rowType.getFieldCount()) {
+                    break;
                 }
-            }
-            if (cnt == rowType.getFieldCount()) {
-                break;
             }
             table = table.copyWithLatestSchema();
             Thread.sleep(1000);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index a623b3a75..99028fa5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -23,7 +23,10 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -38,12 +41,14 @@ import org.junit.jupiter.api.Timeout;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -344,11 +349,11 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         // the first round checks for table creation
         // the second round checks for running the action on an existing table
         for (int i = 0; i < 2; i++) {
-            testAllTypesImpl();
+            testAllTypesOnce();
         }
     }
 
-    private void testAllTypesImpl() throws Exception {
+    private void testAllTypesOnce() throws Exception {
         Map<String, String> mySqlConfig = getBasicMySqlConfig();
         mySqlConfig.put("database-name", DATABASE_NAME);
         mySqlConfig.put("table-name", "all_types_table");
@@ -369,8 +374,30 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         Collections.emptyMap(),
                         Collections.emptyMap());
         action.build(env);
-        JobClient jobClient = env.executeAsync();
+        JobClient client = env.executeAsync();
 
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testAllTypesImpl(statement);
+            }
+        }
+
+        client.cancel().get();
+    }
+
+    private void testAllTypesImpl(Statement statement) throws Exception {
         RowType rowType =
                 RowType.of(
                         new DataType[] {
@@ -574,7 +601,22 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                                 + "]");
         waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
 
-        jobClient.cancel().get();
+        // test all types during schema evolution
+        try {
+            statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v 
INT");
+            List<DataField> newFields = new ArrayList<>(rowType.getFields());
+            newFields.add(new DataField(rowType.getFieldCount(), "v", 
DataTypes.INT()));
+            RowType newRowType = new RowType(newFields);
+            List<String> newExpected =
+                    expected.stream()
+                            .map(s -> s.substring(0, s.length() - 1) + ", 
NULL]")
+                            .collect(Collectors.toList());
+            waitForResult(newExpected, table, newRowType, Arrays.asList("pt", 
"_id"));
+        } finally {
+            statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN 
v");
+            SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+            schemaManager.commitChanges(SchemaChange.dropColumn("v"));
+        }
     }
 
     @Test

Reply via email to