This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b19335767 [Improve] add column comment for mysql cdc create table. 
(#1055)
b19335767 is described below

commit b19335767db0e2a99e35adf948c53219e0083ea2
Author: Guangdong Liu <[email protected]>
AuthorDate: Fri May 5 15:17:08 2023 +0800

    [Improve] add column comment for mysql cdc create table. (#1055)
---
 .../paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java  |  4 ++--
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |  9 +++++----
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java | 23 ++++++++++++++--------
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 15 ++++++++++++++
 .../src/test/resources/mysql/setup.sql             | 20 +++++++++----------
 5 files changed, 47 insertions(+), 24 deletions(-)

diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
index c61ad44cc..3ed5595e9 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlIgnoreCaseE2EeTest.java
@@ -128,8 +128,8 @@ public class MySqlIgnoreCaseE2EeTest extends 
MySqlCdcE2eTestBase {
                         createResultSink("result1", "fields STRING"));
 
         checkResult(
-                "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\"},"
-                        + 
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\"}]");
+                "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT 
NULL\",\"description\":\"\"},"
+                        + 
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
         clearCurrentResults();
         cancelJob(jobId);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 549eb6141..c0d723b22 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -32,6 +32,7 @@ import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 
@@ -68,13 +69,13 @@ class MySqlActionUtils {
     }
 
     static boolean schemaCompatible(TableSchema tableSchema, MySqlSchema 
mySqlSchema) {
-        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields().entrySet()) {
+        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
mySqlSchema.fields().entrySet()) {
             int idx = tableSchema.fieldNames().indexOf(entry.getKey());
             if (idx < 0) {
                 return false;
             }
             DataType type = tableSchema.fields().get(idx).type();
-            if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue(), 
type)
+            if 
(UpdatedDataFieldsProcessFunction.canConvert(entry.getValue().f0, type)
                     != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
                 return false;
             }
@@ -90,8 +91,8 @@ class MySqlActionUtils {
         Schema.Builder builder = Schema.newBuilder();
         builder.options(paimonConfig);
 
-        for (Map.Entry<String, DataType> entry : 
mySqlSchema.fields().entrySet()) {
-            builder.column(entry.getKey(), entry.getValue());
+        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
mySqlSchema.fields().entrySet()) {
+            builder.column(entry.getKey(), entry.getValue().f0, 
entry.getValue().f1);
         }
 
         if (specifiedPrimaryKeys.size() > 0) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 6b5c2831e..2b390b81e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.util.ArrayList;
@@ -36,7 +38,7 @@ public class MySqlSchema {
     private final String databaseName;
     private final String tableName;
 
-    private final LinkedHashMap<String, DataType> fields;
+    private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
     private final List<String> primaryKeys;
 
     public MySqlSchema(
@@ -51,6 +53,7 @@ public class MySqlSchema {
                 String fieldName = rs.getString("COLUMN_NAME");
                 String fieldType = rs.getString("TYPE_NAME");
                 Integer precision = rs.getInt("COLUMN_SIZE");
+                String fieldComment = rs.getString("REMARKS");
 
                 if (rs.wasNull()) {
                     precision = null;
@@ -67,7 +70,11 @@ public class MySqlSchema {
                                     fieldName, databaseName, tableName));
                     fieldName = fieldName.toLowerCase();
                 }
-                fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType, 
precision, scale));
+                fields.put(
+                        fieldName,
+                        Tuple2.of(
+                                MySqlTypeUtils.toDataType(fieldType, 
precision, scale),
+                                fieldComment));
             }
         }
 
@@ -91,7 +98,7 @@ public class MySqlSchema {
         return tableName;
     }
 
-    public Map<String, DataType> fields() {
+    public Map<String, Tuple2<DataType, String>> fields() {
         return fields;
     }
 
@@ -100,14 +107,14 @@ public class MySqlSchema {
     }
 
     public MySqlSchema merge(MySqlSchema other) {
-        for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
+        for (Map.Entry<String, Tuple2<DataType, String>> entry : 
other.fields.entrySet()) {
             String fieldName = entry.getKey();
-            DataType newType = entry.getValue();
+            DataType newType = entry.getValue().f0;
             if (fields.containsKey(fieldName)) {
-                DataType oldType = fields.get(fieldName);
+                DataType oldType = fields.get(fieldName).f0;
                 switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, 
newType)) {
                     case CONVERT:
-                        fields.put(fieldName, newType);
+                        fields.put(fieldName, Tuple2.of(newType, 
entry.getValue().f1));
                         break;
                     case EXCEPTION:
                         throw new IllegalArgumentException(
@@ -120,7 +127,7 @@ public class MySqlSchema {
                                         other.tableName));
                 }
             } else {
-                fields.put(fieldName, newType);
+                fields.put(fieldName, Tuple2.of(newType, entry.getValue().f1));
             }
         }
         if (!primaryKeys.equals(other.primaryKeys)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index d3e950d58..db8a93c21 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -51,6 +52,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** IT cases for {@link MySqlSyncTableAction}. */
@@ -95,6 +97,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
             Thread.sleep(1000);
         }
 
+        checkTableSchema(
+                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT 
NOT 
NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+
         try (Connection conn =
                 DriverManager.getConnection(
                         MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
@@ -106,6 +111,13 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         }
     }
 
+    private void checkTableSchema(String excepted) throws Exception {
+
+        FileStoreTable table = getFileStoreTable();
+
+        assertEquals(excepted, 
JsonSerdeUtil.toFlatJson(table.schema().fields()));
+    }
+
     private void testSchemaEvolutionImpl(Statement statement) throws Exception 
{
         FileStoreTable table = getFileStoreTable();
         statement.executeUpdate("USE paimon_sync_table");
@@ -280,6 +292,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
             Thread.sleep(1000);
         }
 
+        checkTableSchema(
+                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+
         try (Connection conn =
                 DriverManager.getConnection(
                         MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 580e13e3f..a3ee9a113 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -28,24 +28,24 @@ CREATE DATABASE paimon_sync_table;
 USE paimon_sync_table;
 
 CREATE TABLE schema_evolution_1 (
-    pt INT,
-    _id INT,
-    v1 VARCHAR(10),
+    pt INT comment  'primary',
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
     PRIMARY KEY (_id)
 );
 
 CREATE TABLE schema_evolution_2 (
-    pt INT,
-    _id INT,
-    v1 VARCHAR(10),
+    pt INT comment 'primary',
+    _id INT comment  '_id',
+    v1 VARCHAR(10) comment  'v1',
     PRIMARY KEY (_id)
 );
 
 CREATE TABLE schema_evolution_multiple (
-    _id INT,
-    v1 VARCHAR(10),
-    v2 INT,
-    v3 VARCHAR(10),
+    _id INT comment 'primary',
+    v1 VARCHAR(10) comment 'v1',
+    v2 INT comment 'v2',
+    v3 VARCHAR(10) comment 'v3',
     PRIMARY KEY (_id)
 );
 

Reply via email to