This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 72ab38f317 [Improve][Iceberg] Support table comment for catalog (#7936)
72ab38f317 is described below
commit 72ab38f3170b9a01766a616253aaa62a740f8a2c
Author: hailin0 <[email protected]>
AuthorDate: Tue Oct 29 20:30:51 2024 +0800
[Improve][Iceberg] Support table comment for catalog (#7936)
---
.../connectors/seatunnel/iceberg/catalog/IcebergCatalog.java | 9 +++++++--
.../connectors/seatunnel/iceberg/utils/SchemaUtils.java | 3 +++
.../connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java | 3 ++-
3 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index 60591d9893..216b08f9e2 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -63,6 +63,8 @@ import static
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtil
@Slf4j
public class IcebergCatalog implements Catalog {
+ public static final String PROPS_TABLE_COMMENT = "comment";
+
private final String catalogName;
private final ReadonlyConfig readonlyConfig;
private final IcebergCatalogLoader icebergCatalogLoader;
@@ -257,14 +259,17 @@ public class IcebergCatalog implements Catalog {
icebergTable.spec().fields().stream()
.map(PartitionField::name)
.collect(Collectors.toList());
-
+ String comment =
+ Optional.ofNullable(icebergTable.properties())
+ .map(e -> e.get(PROPS_TABLE_COMMENT))
+ .orElse(null);
return CatalogTable.of(
org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
catalogName, tablePath.getDatabaseName(),
tablePath.getTableName()),
builder.build(),
icebergTable.properties(),
partitionKeys,
- null,
+ comment,
catalogName);
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 9aba4a777d..780990572d 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.catalog.IcebergCatalog;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergTypeMapper;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaAddColumn;
@@ -105,6 +106,8 @@ public class SchemaUtils {
SinkConfig config = new SinkConfig(readonlyConfig);
// build auto create table
Map<String, String> options = new HashMap<>(table.getOptions());
+ Optional.ofNullable(table.getComment())
+ .map(e -> options.put(IcebergCatalog.PROPS_TABLE_COMMENT, e));
// override
options.putAll(config.getAutoCreateProps());
return createTable(catalog, toIcebergTableIdentifier(tablePath),
config, schema, options);
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
index 6ec5ae5783..1eeeeebdf9 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
@@ -194,7 +194,8 @@ class IcebergCatalogTest {
TableSchema schema = builder.build();
HashMap<String, String> options = new HashMap<>();
options.put("write.parquet.compression-codec", "zstd");
+ options.put("comment", "test");
return CatalogTable.of(
- tableIdentifier, schema, options,
Collections.singletonList("dt_col"), "null");
+ tableIdentifier, schema, options,
Collections.singletonList("dt_col"), "test");
}
}