This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a91f8b7b2 [FLINK-37577][pipeline-connector/paimon] Fix unable to apply 
column comments to Paimon tables (#4022)
a91f8b7b2 is described below

commit a91f8b7b273bdbb08c451c833ecd5838f8615a70
Author: North Lin <[email protected]>
AuthorDate: Fri May 23 18:09:40 2025 +0800

    [FLINK-37577][pipeline-connector/paimon] Fix unable to apply column 
comments to Paimon tables (#4022)
---
 .../paimon/sink/PaimonMetadataApplier.java         |  3 +-
 .../paimon/sink/PaimonMetadataApplierTest.java     | 50 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 7d2c06c15..06639ecaf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -175,7 +175,8 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
                                             column.getName(),
                                             LogicalTypeConversion.toDataType(
                                                     
DataTypeUtils.toFlinkDataType(column.getType())
-                                                            
.getLogicalType())));
+                                                            .getLogicalType()),
+                                            column.getComment()));
             List<String> partitionKeys = new ArrayList<>();
             List<String> primaryKeys = schema.primaryKeys();
             if (partitionMaps.containsKey(event.tableId())) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 20e770b2a..d349d6070 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -530,4 +530,54 @@ class PaimonMetadataApplierTest {
         
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
                 .isEqualTo(tableSchema);
     }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"filesystem", "hive"})
+    public void testCreateTableWithComment(String metastore)
+            throws Catalog.TableNotExistException, 
Catalog.DatabaseNotEmptyException,
+                    Catalog.DatabaseNotExistException, SchemaEvolveException {
+        initialize(metastore);
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "-1");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.table_with_comment"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "col1",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING()
+                                                .notNull(),
+                                        "comment of col1")
+                                .physicalColumn(
+                                        "col2",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                        "comment of col2")
+                                .physicalColumn(
+                                        "col3",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                        "comment of col3")
+                                .physicalColumn(
+                                        "col4",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+                                        "comment of col4")
+                                .comment("comment of table_with_comment")
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+        Table table = 
catalog.getTable(Identifier.fromString("test.table_with_comment"));
+        RowType tableSchema =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(
+                                        0, "col1", 
DataTypes.STRING().notNull(), "comment of col1"),
+                                new DataField(1, "col2", DataTypes.STRING(), 
"comment of col2"),
+                                new DataField(2, "col3", DataTypes.STRING(), 
"comment of col3"),
+                                new DataField(3, "col4", DataTypes.STRING(), 
"comment of col4")));
+        Assertions.assertThat(table.rowType()).isEqualTo(tableSchema);
+        Assertions.assertThat(table.primaryKeys()).isEmpty();
+        Assertions.assertThat(table.partitionKeys()).isEmpty();
+        Assertions.assertThat(table.options()).containsEntry("bucket", "-1");
+        Assertions.assertThat(table.comment()).contains("comment of 
table_with_comment");
+    }
 }

Reply via email to