This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a91f8b7b2 [FLINK-37577][pipeline-connector/paimon] Fix unable to apply
column comments to Paimon tables (#4022)
a91f8b7b2 is described below
commit a91f8b7b273bdbb08c451c833ecd5838f8615a70
Author: North Lin <[email protected]>
AuthorDate: Fri May 23 18:09:40 2025 +0800
[FLINK-37577][pipeline-connector/paimon] Fix unable to apply column
comments to Paimon tables (#4022)
---
.../paimon/sink/PaimonMetadataApplier.java | 3 +-
.../paimon/sink/PaimonMetadataApplierTest.java | 50 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 7d2c06c15..06639ecaf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -175,7 +175,8 @@ public class PaimonMetadataApplier implements
MetadataApplier {
column.getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(column.getType())
-
.getLogicalType())));
+ .getLogicalType()),
+ column.getComment()));
List<String> partitionKeys = new ArrayList<>();
List<String> primaryKeys = schema.primaryKeys();
if (partitionMaps.containsKey(event.tableId())) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 20e770b2a..d349d6070 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -530,4 +530,54 @@ class PaimonMetadataApplierTest {
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
.isEqualTo(tableSchema);
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"filesystem", "hive"})
+ public void testCreateTableWithComment(String metastore)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotEmptyException,
+ Catalog.DatabaseNotExistException, SchemaEvolveException {
+ initialize(metastore);
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put("bucket", "-1");
+ MetadataApplier metadataApplier =
+ new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ TableId.parse("test.table_with_comment"),
+ org.apache.flink.cdc.common.schema.Schema.newBuilder()
+ .physicalColumn(
+ "col1",
+
org.apache.flink.cdc.common.types.DataTypes.STRING()
+ .notNull(),
+ "comment of col1")
+ .physicalColumn(
+ "col2",
+
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+ "comment of col2")
+ .physicalColumn(
+ "col3",
+
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+ "comment of col3")
+ .physicalColumn(
+ "col4",
+
org.apache.flink.cdc.common.types.DataTypes.STRING(),
+ "comment of col4")
+ .comment("comment of table_with_comment")
+ .build());
+ metadataApplier.applySchemaChange(createTableEvent);
+ Table table =
catalog.getTable(Identifier.fromString("test.table_with_comment"));
+ RowType tableSchema =
+ new RowType(
+ Arrays.asList(
+ new DataField(
+ 0, "col1",
DataTypes.STRING().notNull(), "comment of col1"),
+ new DataField(1, "col2", DataTypes.STRING(),
"comment of col2"),
+ new DataField(2, "col3", DataTypes.STRING(),
"comment of col3"),
+ new DataField(3, "col4", DataTypes.STRING(),
"comment of col4")));
+ Assertions.assertThat(table.rowType()).isEqualTo(tableSchema);
+ Assertions.assertThat(table.primaryKeys()).isEmpty();
+ Assertions.assertThat(table.partitionKeys()).isEmpty();
+ Assertions.assertThat(table.options()).containsEntry("bucket", "-1");
+ Assertions.assertThat(table.comment()).contains("comment of
table_with_comment");
+ }
}