Copilot commented on code in PR #4314:
URL: https://github.com/apache/flink-cdc/pull/4314#discussion_r2929506068


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java:
##########
@@ -35,14 +35,60 @@ public class IcebergDataSinkOptions {
             key("catalog.properties.type")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription("Type of iceberg catalog, supports 
`hadoop` and `hive`.");
+                    .withDescription(
+                            "Type of iceberg catalog, supports `hadoop`, 
`hive` and `glue`.");
+
+    public static final ConfigOption<String> CATALOG_IMPL =
+            key("catalog.properties.catalog-impl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom catalog implementation class. "
+                                    + "For AWS Glue catalog, use 
`org.apache.iceberg.aws.glue.GlueCatalog`.");
+
+    public static final ConfigOption<String> IO_IMPL =
+            key("catalog.properties.io-impl")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom FileIO implementation class. "
+                                    + "For AWS S3, use 
`org.apache.iceberg.aws.s3.S3FileIO`.");
+
+    public static final ConfigOption<String> GLUE_ID =
+            key("catalog.properties.glue.id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The Glue catalog ID (AWS account ID). By default, 
the caller's AWS account ID is used.");
+
+    public static final ConfigOption<Boolean> GLUE_SKIP_ARCHIVE =
+            key("catalog.properties.glue.skip-archive")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to skip archiving older table versions in 
Glue. Default is true.");
+
+    public static final ConfigOption<Boolean> GLUE_SKIP_NAME_VALIDATION =
+            key("catalog.properties.glue.skip-name-validation")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to skip name validation for Glue catalog. 
Default is false.");
+
+    public static final ConfigOption<String> CLIENT_REGION =
+            key("catalog.properties.client.region")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The AWS region for the Glue catalog 
client.");
 
     public static final ConfigOption<String> WAREHOUSE =
             key("catalog.properties.warehouse")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The warehouse root path of catalog, only usable 
when catalog.properties.type is `hadoop`.");
+                            "The warehouse root path of catalog. "
+                                    + "For `hadoop` catalog, this is the local 
or HDFS path. "
+                                    + "For `glue` catalog, this is typically 
an S3 path like `s3://my-bucket/warehouse`.");

Review Comment:
   `WAREHOUSE` description was updated for Hadoop/Glue, but it no longer 
mentions the Hive catalog case. Since `catalog.properties.warehouse` is also 
commonly used/required for HiveCatalog setups, consider updating the 
description to cover `hive` as well (or state that it applies to all catalog 
types, with examples for each).
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml:
##########
@@ -291,15 +298,8 @@ limitations under the License.
                             <shadeTestJar>false</shadeTestJar>
                             <artifactSet>
                                 <includes>

Review Comment:
   The shade plugin configuration now has an empty `<artifactSet><includes>`, 
which typically results in shading nothing (or at best makes the output 
dependent on plugin defaults). This looks accidental, especially since other 
pipeline connectors explicitly include their external dependencies for 
self-contained connector JARs. Consider restoring explicit includes (e.g., 
`org.apache.iceberg:*` and `org.apache.iceberg:iceberg-aws`) and, if Iceberg 
still pulls Kafka clients, restoring the Kafka relocation to avoid dependency 
conflicts.
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml:
##########
@@ -35,6 +35,13 @@ limitations under the License.
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-flink-runtime-1.20</artifactId>
             <version>${iceberg.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-aws</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>provided</scope>

Review Comment:
   Changing Iceberg dependencies to scope `provided` means the published 
connector JAR will no longer carry `iceberg-flink-runtime-1.20` / `iceberg-aws` 
at runtime (and Maven Shade won’t include provided deps by default). Unless the 
distribution/runtime explicitly supplies these artifacts, this will cause 
`ClassNotFoundException` when the sink tries to build the catalog (especially 
for Glue). Consider keeping these as non-provided (and shading them), or 
document/ensure that users must add the Iceberg runtime + iceberg-aws (and AWS 
SDK) to the classpath.
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java:
##########
@@ -111,6 +111,48 @@ void testPrefixRequireOption() {
         Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
     }
 
+    @Test
+    void testCreateGlueCatalogDataSink() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("catalog.properties.type", "glue")
+                                .put("catalog.properties.warehouse", 
"s3://my-bucket/warehouse")
+                                .put("catalog.properties.io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO")
+                                .put("catalog.properties.client.region", 
"us-east-1")
+                                .put("catalog.properties.glue.skip-archive", 
"true")
+                                .build());
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+    }
+
+    @Test
+    void testCreateGlueCatalogWithCatalogImpl() {
+        DataSinkFactory sinkFactory =
+                FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg", 
DataSinkFactory.class);
+        
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+        Configuration conf =
+                Configuration.fromMap(
+                        ImmutableMap.<String, String>builder()
+                                .put("catalog.properties.catalog-impl", 
"org.apache.iceberg.aws.glue.GlueCatalog")
+                                .put("catalog.properties.warehouse", 
"s3://my-bucket/warehouse")
+                                .put("catalog.properties.io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO")
+                                .build());
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+        Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+    }

Review Comment:
   These new tests only assert that `createDataSink(...)` returns an 
`IcebergDataSink`, but `IcebergDataSinkFactory` doesn’t build the Iceberg 
catalog; the actual `CatalogUtil.buildIcebergCatalog(...)` happens later (e.g., 
in `IcebergWriter`/`IcebergCommitter`/`IcebergMetadataApplier`). As a result, 
the tests won’t catch missing Glue classes (e.g., if `iceberg-aws` isn’t on the 
runtime classpath) or invalid Glue-specific properties. Consider extending the 
test to actually invoke `CatalogUtil.buildIcebergCatalog` using the 
`catalogOptions` produced by the factory and assert the resulting catalog 
implementation (without making AWS calls).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to