Hisoka-X commented on code in PR #9654:
URL: https://github.com/apache/seatunnel/pull/9654#discussion_r2250312138
##########
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/IcebergCommonOptions.java:
##########
@@ -87,4 +87,35 @@ public class IcebergCommonOptions {
.stringType()
.noDefaultValue()
.withDescription("When using kerberos, We should specify
the keytab path");
+
+ // REST Catalog Configuration
+ public static final Option<String> REST_URI =
+ Options.key("rest.uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The URI for the REST catalog endpoint");
+
+ public static final Option<String> REST_WAREHOUSE =
+ Options.key("rest.warehouse")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The warehouse location for the REST
catalog");
+
+ public static final Option<String> REST_AUTH_TYPE =
+ Options.key("rest.auth.type")
+ .stringType()
+ .defaultValue("none")
+ .withDescription(
+ "Authentication type for REST catalog. Supported
values: none, token, aws");
+
+ public static final Option<String> REST_AUTH_TOKEN =
+ Options.key("rest.auth.token")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Authentication token for REST catalog when
auth.type is 'token'");
Review Comment:
why not directly put all config into `iceberg.catalog.config`?
##########
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java:
##########
@@ -58,8 +60,98 @@ public IcebergCatalogLoader(IcebergCommonConfig config) {
public Catalog loadCatalog() {
// When using the SeaTunnel engine, set the current class loader to
prevent loading failures
Thread.currentThread().setContextClassLoader(IcebergCatalogLoader.class.getClassLoader());
+
+ // Prepare catalog properties with REST-specific configurations
+ Map<String, String> catalogProps = prepareCatalogProperties();
+
return CatalogUtil.buildIcebergCatalog(
- config.getCatalogName(), config.getCatalogProps(),
loadHadoopConfig(config));
+ config.getCatalogName(), catalogProps,
loadHadoopConfig(config));
+ }
+
+ private Map<String, String> prepareCatalogProperties() {
+ Map<String, String> catalogProps = new HashMap<>();
+ log.info("Preparing catalog properties for REST catalog");
+
+ // Add base catalog properties if provided
+ if (config.getCatalogProps() != null) {
+ catalogProps.putAll(config.getCatalogProps());
+ log.info("Added base catalog properties: {}",
config.getCatalogProps());
+ }
+
+ // Add REST catalog specific properties
+ if (config.getRestUri() != null) {
+ catalogProps.put("uri", config.getRestUri());
+ log.info("REST URI configured: {}", config.getRestUri());
+ } else {
+ log.warn("REST URI is not configured");
+ }
+
+ if (config.getRestWarehouse() != null) {
+ catalogProps.put("warehouse", config.getRestWarehouse());
+ log.info("REST warehouse configured: {}",
config.getRestWarehouse());
+ } else {
+ log.warn("REST warehouse is not configured");
+ }
+
+ // Handle authentication based on auth type
+ String authType = config.getRestAuthType();
+ log.info("REST authentication type: {}", authType);
+
+ if ("aws".equals(authType)) {
+ log.info("Setting up AWS authentication");
+ setupAwsAuthentication(catalogProps);
+ } else if ("token".equals(authType) && config.getRestAuthToken() !=
null) {
+ catalogProps.put("token", config.getRestAuthToken());
+ log.info("Token authentication configured");
+ } else if ("none".equals(authType)) {
+ log.info("No authentication configured");
+ } else {
+ log.warn("Unknown or unsupported authentication type: {}",
authType);
+ }
+
+ log.info(
+ "Final catalog properties (excluding secrets): {}",
+ catalogProps.entrySet().stream()
+ .filter(
+ entry ->
+ !entry.getKey().contains("secret")
+ &&
!entry.getKey().contains("token"))
+ .collect(
+ java.util.stream.Collectors.toMap(
+ java.util.Map.Entry::getKey,
+ java.util.Map.Entry::getValue)));
+
+ return catalogProps;
+ }
+
+ private void setupAwsAuthentication(Map<String, String> catalogProps) {
+ log.info("Setting up AWS authentication for REST catalog");
+
+ // Enable SigV4 signing for AWS REST catalog
+ catalogProps.put("rest.sigv4-enabled", "true");
+ log.info("Enabled SigV4 signing");
+
+ // AWS credentials will be resolved through environment variables:
+ // AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
(optional)
+ // AWS_REGION or AWS_DEFAULT_REGION
+ log.info("AWS credentials will be resolved through environment
variables");
+
+ // Determine signing service based on REST URI
+ String restUri = config.getRestUri();
+ if (restUri != null) {
+ if (restUri.contains("s3tables")) {
+ catalogProps.put("rest.signing-name", "s3tables");
+ log.info("Signing service set to: s3tables for URI: {}",
restUri);
+ } else {
+ catalogProps.put("rest.signing-name", "glue");
+ log.info("Signing service set to: glue for URI: {}", restUri);
+ }
+ } else {
+ catalogProps.put("rest.signing-name", "glue");
+ log.warn("REST URI is null, defaulting signing service to: glue");
+ }
Review Comment:
Are our existing parameters already capable of reading AWS's rest catalog?
Is this PR just a parameter conversion? I prefer to just add examples to the
documentation rather than add a new config key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]