This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 693e8a787f Flink: Handle table comments in FlinkSQL (#16423)
693e8a787f is described below
commit 693e8a787fd77d327f7d482c156f82ae711ea0d0
Author: Stepan Stepanishchev
<[email protected]>
AuthorDate: Thu May 21 12:29:43 2026 +0700
Flink: Handle table comments in FlinkSQL (#16423)
---
.../java/org/apache/iceberg/flink/FlinkCatalog.java | 6 ++++++
.../apache/iceberg/flink/TestFlinkCatalogTable.java | 19 +++++++++++++++++++
2 files changed, 25 insertions(+)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4bb235b811..a56c4e0ca6 100644
--- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -450,6 +451,11 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
+ String comment = table.getComment();
+ if (comment != null && !comment.isEmpty()) {
+ properties.put(TableProperties.COMMENT, comment);
+ }
+
try {
icebergCatalog.createTable(
toIdentifier(tablePath), icebergSchema, spec, location,
properties.build());
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 091a7b67b4..020663a7de 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -243,6 +244,24 @@ public class TestFlinkCatalogTable extends CatalogTestBase
{
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY,
srcCatalogProps);
}
+ @TestTemplate
+ public void testCreateTableWithTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+ }
+
+ @TestTemplate
+ public void testAlterTableModifyTableComment() {
+ // create table with comment
+ sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"table comment");
+
+ // alter table comment
+ sql("ALTER TABLE tl SET('comment' = 'new comment')");
+
assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT,
"new comment");
+ }
+
@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)