Copilot commented on code in PR #4314:
URL: https://github.com/apache/flink-cdc/pull/4314#discussion_r2929506068
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java:
##########
@@ -35,14 +35,60 @@ public class IcebergDataSinkOptions {
key("catalog.properties.type")
.stringType()
.noDefaultValue()
- .withDescription("Type of iceberg catalog, supports
`hadoop` and `hive`.");
+ .withDescription(
+ "Type of iceberg catalog, supports `hadoop`,
`hive` and `glue`.");
+
+ public static final ConfigOption<String> CATALOG_IMPL =
+ key("catalog.properties.catalog-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom catalog implementation class. "
+ + "For AWS Glue catalog, use
`org.apache.iceberg.aws.glue.GlueCatalog`.");
+
+ public static final ConfigOption<String> IO_IMPL =
+ key("catalog.properties.io-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Custom FileIO implementation class. "
+ + "For AWS S3, use
`org.apache.iceberg.aws.s3.S3FileIO`.");
+
+ public static final ConfigOption<String> GLUE_ID =
+ key("catalog.properties.glue.id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The Glue catalog ID (AWS account ID). By default,
the caller's AWS account ID is used.");
+
+ public static final ConfigOption<Boolean> GLUE_SKIP_ARCHIVE =
+ key("catalog.properties.glue.skip-archive")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to skip archiving older table versions in
Glue. Default is true.");
+
+ public static final ConfigOption<Boolean> GLUE_SKIP_NAME_VALIDATION =
+ key("catalog.properties.glue.skip-name-validation")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to skip name validation for Glue catalog.
Default is false.");
+
+ public static final ConfigOption<String> CLIENT_REGION =
+ key("catalog.properties.client.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The AWS region for the Glue catalog
client.");
public static final ConfigOption<String> WAREHOUSE =
key("catalog.properties.warehouse")
.stringType()
.noDefaultValue()
.withDescription(
- "The warehouse root path of catalog, only usable
when catalog.properties.type is `hadoop`.");
+ "The warehouse root path of catalog. "
+ + "For `hadoop` catalog, this is the local
or HDFS path. "
+ + "For `glue` catalog, this is typically
an S3 path like `s3://my-bucket/warehouse`.");
Review Comment:
`WAREHOUSE` description was updated for Hadoop/Glue, but it no longer
mentions the Hive catalog case. Since `catalog.properties.warehouse` is also
commonly used/required for HiveCatalog setups, consider updating the
description to cover `hive` as well (or state that it applies to all catalog
types, with examples for each).
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml:
##########
@@ -291,15 +298,8 @@ limitations under the License.
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
Review Comment:
The shade plugin configuration now has an empty `<artifactSet><includes>`,
which typically results in shading nothing (or at best makes the output
dependent on plugin defaults). This looks accidental, especially since other
pipeline connectors explicitly include their external dependencies for
self-contained connector JARs. Consider restoring explicit includes (e.g.,
`org.apache.iceberg:*` and `org.apache.iceberg:iceberg-aws`) and, if Iceberg
still pulls Kafka clients, restoring the Kafka relocation to avoid dependency
conflicts.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/pom.xml:
##########
@@ -35,6 +35,13 @@ limitations under the License.
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.20</artifactId>
<version>${iceberg.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-aws</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>provided</scope>
Review Comment:
Changing Iceberg dependencies to scope `provided` means the published
connector JAR will no longer carry `iceberg-flink-runtime-1.20` / `iceberg-aws`
at runtime (and Maven Shade won’t include provided deps by default). Unless the
distribution/runtime explicitly supplies these artifacts, this will cause
`ClassNotFoundException` when the sink tries to build the catalog (especially
for Glue). Consider keeping these as non-provided (and shading them), or
document/ensure that users must add the Iceberg runtime + iceberg-aws (and AWS
SDK) to the classpath.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java:
##########
@@ -111,6 +111,48 @@ void testPrefixRequireOption() {
Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
}
+ @Test
+ void testCreateGlueCatalogDataSink() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("catalog.properties.type", "glue")
+ .put("catalog.properties.warehouse",
"s3://my-bucket/warehouse")
+ .put("catalog.properties.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
+ .put("catalog.properties.client.region",
"us-east-1")
+ .put("catalog.properties.glue.skip-archive",
"true")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ }
+
+ @Test
+ void testCreateGlueCatalogWithCatalogImpl() {
+ DataSinkFactory sinkFactory =
+ FactoryDiscoveryUtils.getFactoryByIdentifier("iceberg",
DataSinkFactory.class);
+
Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("catalog.properties.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
+ .put("catalog.properties.warehouse",
"s3://my-bucket/warehouse")
+ .put("catalog.properties.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
+ .build());
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ }
Review Comment:
These new tests only assert that `createDataSink(...)` returns an
`IcebergDataSink`, but `IcebergDataSinkFactory` doesn’t build the Iceberg
catalog; the actual `CatalogUtil.buildIcebergCatalog(...)` happens later (e.g.,
in `IcebergWriter`/`IcebergCommitter`/`IcebergMetadataApplier`). As a result,
the tests won’t catch missing Glue classes (e.g., if `iceberg-aws` isn’t on the
runtime classpath) or invalid Glue-specific properties. Consider extending the
test to actually invoke `CatalogUtil.buildIcebergCatalog` using the
`catalogOptions` produced by the factory and assert the resulting catalog
implementation (without making AWS calls).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]