This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b5d967325 [FLINK-34865][pipeline-connector/mysql] Support sync table 
and column comment
b5d967325 is described below

commit b5d9673258069ec05eceb2b68eeaf8d2b6212147
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Thu Jan 16 10:11:29 2025 +0800

    [FLINK-34865][pipeline-connector/mysql] Support sync table and column 
comment
    
    This closes #3482
    
    Co-authored-by: Leonard Xu <xbjt...@gmail.com>
---
 .../docs/connectors/pipeline-connectors/doris.md   |  7 ++
 .../docs/connectors/pipeline-connectors/mysql.md   |  7 ++
 .../docs/connectors/pipeline-connectors/doris.md   |  7 ++
 .../docs/connectors/pipeline-connectors/mysql.md   |  8 ++
 .../org/apache/flink/cdc/common/schema/Schema.java |  4 +
 .../flink-cdc-pipeline-connector-doris/pom.xml     |  5 +-
 .../doris/factory/DorisDataSinkFactory.java        |  2 +
 .../doris/sink/DorisDataSinkOptions.java           |  6 ++
 .../doris/sink/DorisMetadataApplier.java           |  5 +-
 .../doris/sink/DorisSchemaChangeManager.java       |  4 +-
 .../mysql/factory/MySqlDataSourceFactory.java      | 12 +++
 .../connectors/mysql/source/MySqlDataSource.java   | 12 ++-
 .../mysql/source/MySqlDataSourceOptions.java       |  9 ++
 .../mysql/source/MySqlEventDeserializer.java       |  9 +-
 .../source/parser/CustomMySqlAntlrDdlParser.java   |  5 +-
 .../source/reader/MySqlPipelineRecordEmitter.java  | 13 ++-
 .../mysql/source/MySqlPipelineITCase.java          | 99 ++++++++++++++++++++++
 .../paimon/sink/PaimonMetadataApplier.java         |  1 +
 18 files changed, 203 insertions(+), 12 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
index 3443ad8d8..307abc138 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
@@ -119,6 +119,13 @@ pipeline:
       <td>String</td>
       <td> 是否通过FE重定向写入,直连BE写入 </td>
     </tr>
+    <tr>
+      <td>charset-encoding</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td> Doris Http客户端字符集编码,默认UTF-8 </td>
+    </tr>
     <tr>
       <td>sink.enable.batch-mode</td>
       <td>optional</td>
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
index 2ff209442..0de46fd49 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
@@ -298,6 +298,13 @@ pipeline:
         这是一项实验性功能。
       </td>
     </tr>
+    <tr>
+      <td>include-comments.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/doris.md 
b/docs/content/docs/connectors/pipeline-connectors/doris.md
index df740c1a9..cee412c16 100644
--- a/docs/content/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content/docs/connectors/pipeline-connectors/doris.md
@@ -119,6 +119,13 @@ pipeline:
        <td>String</td>
        <td> Whether to write through FE redirection and directly connect to BE 
to write </td>
      </tr>
+     <tr>
+       <td>charset-encoding</td>
+       <td>optional</td>
+       <td style="word-wrap: break-word;">false</td>
+       <td>Boolean</td>
+       <td> Charset encoding for doris http client, default UTF-8 </td>
+     </tr>
      <tr>
        <td>sink.enable.batch-mode</td>
        <td>optional</td>
diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md 
b/docs/content/docs/connectors/pipeline-connectors/mysql.md
index 29c6549e5..df0faec4d 100644
--- a/docs/content/docs/connectors/pipeline-connectors/mysql.md
+++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md
@@ -305,6 +305,14 @@ pipeline:
         This is an experimental feature, and subject to change in the future.
       </td> 
     </tr>
+    <tr>
+      <td>include-comments.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether enable include table and column comments, by default is 
false, if set to true, the table and column comments will be sent.<br>
+          Note: Enable this option will bring the implications on memory 
usage.</td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
index e452ba547..61f7d36bc 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -239,6 +240,9 @@ public class Schema implements Serializable {
         if (!partitionKeys.isEmpty()) {
             sb.append(", partitionKeys=").append(String.join(";", 
partitionKeys));
         }
+        if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+            sb.append(", comment=").append(comment);
+        }
         sb.append(", options=").append(describeOptions());
 
         return sb.toString();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index 0cee7828a..3257debfe 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -28,6 +28,7 @@ limitations under the License.
 
     <properties>
         <doris.connector.version>24.0.1</doris.connector.version>
+        <mysql.connector.version>8.0.26</mysql.connector.version>
     </properties>
 
     <dependencies>
@@ -84,13 +85,13 @@ limitations under the License.
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>jdbc</artifactId>
-            <version>1.18.3</version>
+            <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
-            <version>8.0.26</version>
+            <version>${mysql.connector.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
index d0567ab03..4891309d4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
@@ -39,6 +39,7 @@ import java.util.Set;
 
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
@@ -143,6 +144,7 @@ public class DorisDataSinkFactory implements 
DataSinkFactory {
         options.add(JDBC_URL);
         options.add(PASSWORD);
         options.add(AUTO_REDIRECT);
+        options.add(CHARSET_ENCODING);
 
         options.add(SINK_CHECK_INTERVAL);
         options.add(SINK_ENABLE_2PC);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
index 62613c059..5d58bbc82 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
@@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
                     .withDescription(
                             "Use automatic redirection of fe without 
explicitly obtaining the be list");
 
+    public static final ConfigOption<String> CHARSET_ENCODING =
+            ConfigOptions.key("charset-encoding")
+                    .stringType()
+                    .defaultValue("UTF-8")
+                    .withDescription("Charset encoding for doris http client, 
default UTF-8.");
+
     // Streaming Sink options
     public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
             ConfigOptions.key("sink.enable-2pc")
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index ccf6d8798..1d6476152 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -64,6 +64,7 @@ import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUM
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
 import static 
org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
+import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
 import static 
org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
 
 /** Supports {@link DorisDataSink} to schema evolution. */
@@ -76,7 +77,8 @@ public class DorisMetadataApplier implements MetadataApplier {
 
     public DorisMetadataApplier(DorisOptions dorisOptions, Configuration 
config) {
         this.dorisOptions = dorisOptions;
-        this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions);
+        this.schemaChangeManager =
+                new DorisSchemaChangeManager(dorisOptions, 
config.get(CHARSET_ENCODING));
         this.config = config;
         this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
     }
@@ -147,6 +149,7 @@ public class DorisMetadataApplier implements 
MetadataApplier {
             tableSchema.setDatabase(tableId.getSchemaName());
             tableSchema.setFields(buildFields(schema));
             tableSchema.setDistributeKeys(buildDistributeKeys(schema));
+            tableSchema.setTableComment(schema.comment());
 
             if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
                 tableSchema.setModel(DataModel.DUPLICATE);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
index a4636f045..5358faf78 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java
@@ -27,8 +27,8 @@ import static 
org.apache.doris.flink.catalog.doris.DorisSystem.identifier;
 
 /** An enriched version of Doris' {@link SchemaChangeManager}. */
 public class DorisSchemaChangeManager extends SchemaChangeManager {
-    public DorisSchemaChangeManager(DorisOptions dorisOptions) {
-        super(dorisOptions);
+    public DorisSchemaChangeManager(DorisOptions dorisOptions, String 
charsetEncoding) {
+        super(dorisOptions, charsetEncoding);
     }
 
     public boolean truncateTable(String databaseName, String tableName)
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 408be27e9..41e60890e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -37,9 +37,11 @@ import 
org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
 import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectPath;
 
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Tables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +68,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
@@ -132,6 +135,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         double distributionFactorLower = 
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
 
         boolean closeIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+        boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);
 
         Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
         Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@@ -152,6 +156,13 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
 
         Map<String, String> configMap = config.toMap();
         OptionUtils.printOptions(IDENTIFIER, config.toMap());
+        if (includeComments) {
+            // set debezium config 'include.schema.comments' to true
+            configMap.put(
+                    DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+                            + 
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+                    "true");
+        }
 
         MySqlSourceConfigFactory configFactory =
                 new MySqlSourceConfigFactory()
@@ -310,6 +321,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
         options.add(METADATA_LIST);
+        options.add(INCLUDE_COMMENTS_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
index ae676b37a..7b4ee5eab 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
@@ -31,6 +31,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEm
 import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
 import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
 
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -57,11 +59,19 @@ public class MySqlDataSource implements DataSource {
 
     @Override
     public EventSourceProvider getEventSourceProvider() {
+        boolean includeComments =
+                sourceConfig
+                        .getDbzConfiguration()
+                        .getBoolean(
+                                
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+                                false);
+
         MySqlEventDeserializer deserializer =
                 new MySqlEventDeserializer(
                         DebeziumChangelogMode.ALL,
                         sourceConfig.isIncludeSchemaChanges(),
-                        readableMetadataList);
+                        readableMetadataList,
+                        includeComments);
 
         MySqlSource<Event> source =
                 new MySqlSource<>(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index aa682c211..374721244 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -289,4 +289,13 @@ public class MySqlDataSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to parse schema change events generated 
by gh-ost/pt-osc utilities. Defaults to false.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED =
+            ConfigOptions.key("include-comments.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether enable include table and column comments, 
by default is false, if set to true, table and column comments will be sent. "
+                                    + "Note: Enable this option will bring the 
implications on memory usage.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
index a50ac8729..004fc5e1a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java
@@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends 
DebeziumEventDeserializationSchema {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     private final boolean includeSchemaChanges;
+    private final boolean includeComments;
 
     private transient Tables tables;
     private transient CustomMySqlAntlrDdlParser customParser;
@@ -70,23 +71,25 @@ public class MySqlEventDeserializer extends 
DebeziumEventDeserializationSchema {
 
     public MySqlEventDeserializer(
             DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) 
{
-        this(changelogMode, includeSchemaChanges, new ArrayList<>());
+        this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
     }
 
     public MySqlEventDeserializer(
             DebeziumChangelogMode changelogMode,
             boolean includeSchemaChanges,
-            List<MySqlReadableMetadata> readableMetadataList) {
+            List<MySqlReadableMetadata> readableMetadataList,
+            boolean includeComments) {
         super(new MySqlSchemaDataTypeInference(), changelogMode);
         this.includeSchemaChanges = includeSchemaChanges;
         this.readableMetadataList = readableMetadataList;
+        this.includeComments = includeComments;
     }
 
     @Override
     protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
         if (includeSchemaChanges) {
             if (customParser == null) {
-                customParser = new CustomMySqlAntlrDdlParser();
+                customParser = new CustomMySqlAntlrDdlParser(includeComments);
                 tables = new Tables();
             }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
index 1264aa8d6..624d1ac41 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java
@@ -23,6 +23,7 @@ import io.debezium.antlr.AntlrDdlParserListener;
 import io.debezium.antlr.DataTypeResolver;
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
 import io.debezium.ddl.parser.mysql.generated.MySqlParser;
+import io.debezium.relational.Tables;
 
 import java.sql.Types;
 import java.util.ArrayList;
@@ -35,8 +36,8 @@ public class CustomMySqlAntlrDdlParser extends 
MySqlAntlrDdlParser {
 
     private final LinkedList<SchemaChangeEvent> parsedEvents;
 
-    public CustomMySqlAntlrDdlParser() {
-        super();
+    public CustomMySqlAntlrDdlParser(boolean includeComments) {
+        super(true, false, includeComments, null, 
Tables.TableFilter.includeAll());
         this.parsedEvents = new LinkedList<>();
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index 4f801e0fa..b5a6ec197 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.connector.base.source.reader.RecordEmitter;
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.Tables;
@@ -211,6 +212,7 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
                     column.comment(),
                     column.defaultValueExpression().orElse(null));
         }
+        tableBuilder.comment(table.comment());
 
         List<String> primaryKey = table.primaryKeyColumnNames();
         if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
@@ -229,7 +231,16 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
 
     private synchronized MySqlAntlrDdlParser getParser() {
         if (mySqlAntlrDdlParser == null) {
-            mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
+            boolean includeComments =
+                    sourceConfig
+                            .getDbzConfiguration()
+                            .getBoolean(
+                                    
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
+                                            .name(),
+                                    false);
+            mySqlAntlrDdlParser =
+                    new MySqlAntlrDdlParser(
+                            true, false, includeComments, null, 
Tables.TableFilter.includeAll());
         }
         return mySqlAntlrDdlParser;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 06cfa5628..1252733f4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -75,7 +75,14 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
@@ -931,6 +938,98 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                 
actual.stream().map(Object::toString).collect(Collectors.toList()));
     }
 
+    @Test
+    public void testIncludeComments() throws Exception {
+        env.setParallelism(1);
+        inventoryDatabase.createAndInitialize();
+        TableId tableId =
+                TableId.tableId(inventoryDatabase.getDatabaseName(), 
"products_with_comments");
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+                                + "  id INTEGER NOT NULL AUTO_INCREMENT 
COMMENT 'column comment of id' PRIMARY KEY,\n"
+                                + "  name VARCHAR(255) NOT NULL DEFAULT 
'flink' COMMENT 'column comment of name',\n"
+                                + "  weight FLOAT(6) COMMENT 'column comment 
of weight'\n"
+                                + ")\n"
+                                + "COMMENT 'table comment of products';",
+                        inventoryDatabase.getDatabaseName(), 
"products_with_comments");
+        executeSql(inventoryDatabase, createTableSql);
+
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(SERVER_TIME_ZONE.key(), "UTC");
+        options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".products_with_comments");
+        Factory.Context context =
+                new FactoryHelper.DefaultContext(
+                        Configuration.fromMap(options), null, 
this.getClass().getClassLoader());
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) dataSource.getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        // add some column
+        String addColumnSql =
+                String.format(
+                        "ALTER TABLE `%s`.`products_with_comments` ADD COLUMN 
`description` VARCHAR(512) comment 'column comment of description';",
+                        inventoryDatabase.getDatabaseName());
+        executeSql(inventoryDatabase, addColumnSql);
+
+        List<Event> expectedEvents = getEventsWithComments(tableId);
+        List<Event> actual = fetchResults(events, expectedEvents.size());
+        assertEqualsInAnyOrder(
+                
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+    }
+
+    private void executeSql(UniqueDatabase database, String sql) throws 
SQLException {
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+        }
+    }
+
+    private List<Event> getEventsWithComments(TableId tableId) {
+        return Arrays.asList(
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn(
+                                        "id", DataTypes.INT().notNull(), 
"column comment of id")
+                                .physicalColumn(
+                                        "name",
+                                        DataTypes.VARCHAR(255).notNull(),
+                                        "column comment of name",
+                                        "flink")
+                                .physicalColumn(
+                                        "weight", DataTypes.FLOAT(), "column 
comment of weight")
+                                .primaryKey(Collections.singletonList("id"))
+                                .comment("table comment of products")
+                                .build()),
+                new AddColumnEvent(
+                        tableId,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn(
+                                                "description",
+                                                DataTypes.VARCHAR(512),
+                                                "column comment of 
description")))));
+    }
+
     private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
         return new CreateTableEvent(
                 tableId,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 74601ba13..1c79ae8f9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -183,6 +183,7 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
             }
             builder.partitionKeys(partitionKeys)
                     .primaryKey(primaryKeys)
+                    .comment(schema.comment())
                     .options(tableOptions)
                     .options(schema.options());
             catalog.createTable(tableIdToIdentifier(event), builder.build(), 
true);


Reply via email to