This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac423af5699 Pass-through IcebergIO catalog properties (#31726)
ac423af5699 is described below

commit ac423af5699682c5519ae8d1a3af035ba7a5eab7
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Jul 8 13:15:03 2024 -0400

    Pass-through IcebergIO catalog properties (#31726)
    
    * Pass-through iceberg catalog properties
    
    * add to CHANGES.md
    
    * trigger integration test
    
    * remove custom configuration; pass catalog name
---
 .../IO_Iceberg_Integration_Tests.json              |   3 +-
 CHANGES.md                                         |   2 +
 .../beam/sdk/io/iceberg/IcebergCatalogConfig.java  | 197 +--------------------
 .../IcebergReadSchemaTransformProvider.java        |  36 ++--
 .../IcebergSchemaTransformCatalogConfig.java       | 107 -----------
 .../IcebergWriteSchemaTransformProvider.java       |  42 ++---
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java    |  16 +-
 .../beam/sdk/io/iceberg/IcebergIOReadTest.java     |  11 +-
 .../beam/sdk/io/iceberg/IcebergIOWriteTest.java    |  31 ++--
 .../IcebergReadSchemaTransformProviderTest.java    |  34 ++--
 .../IcebergSchemaTransformTranslationTest.java     |  49 +++--
 .../IcebergWriteSchemaTransformProviderTest.java   |  34 ++--
 .../apache/beam/sdk/io/iceberg/ScanSourceTest.java |  28 ++-
 13 files changed, 152 insertions(+), 438 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index a03c067d2c4..1efc8e9e440 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,3 +1,4 @@
 {
-    "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+    "comment": "Modify this file in a trivial way to cause this test suite to 
run",
+    "modification": 1
 }
diff --git a/CHANGES.md b/CHANGES.md
index 0a620038f11..85f1be48cfb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,12 +67,14 @@
 ## New Features / Improvements
 
 * Multiple RunInference instances can now share the same model instance by 
setting the model_identifier parameter (Python) 
([#31665](https://github.com/apache/beam/issues/31665)).
+* [IcebergIO] All specified catalog properties are passed through to the 
connector ([#31726](https://github.com/apache/beam/pull/31726))
 * Removed a 3rd party LGPL dependency from the Go SDK 
([#31765](https://github.com/apache/beam/issues/31765)).
 * Support for MapState and SetState when using Dataflow Runner v1 with 
Streaming Engine (Java) 
([[#18200](https://github.com/apache/beam/issues/18200)])
 
 ## Breaking Changes
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog 
properties in a key-store fashion 
([#31726](https://github.com/apache/beam/pull/31726))
 
 ## Deprecations
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index fefef4aa491..2956d75a266 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,214 +19,35 @@ package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import javax.annotation.Nullable;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import java.util.Properties;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.checkerframework.dataflow.qual.Pure;
 
 @AutoValue
 public abstract class IcebergCatalogConfig implements Serializable {
-
-  @Pure
-  public abstract String getName();
-
-  /* Core Properties */
-  @Pure
-  public abstract @Nullable String getIcebergCatalogType();
-
-  @Pure
-  public abstract @Nullable String getCatalogImplementation();
-
-  @Pure
-  public abstract @Nullable String getFileIOImplementation();
-
-  @Pure
-  public abstract @Nullable String getWarehouseLocation();
-
-  @Pure
-  public abstract @Nullable String getMetricsReporterImplementation();
-
-  /* Caching */
-  @Pure
-  public abstract boolean getCacheEnabled();
-
-  @Pure
-  public abstract boolean getCacheCaseSensitive();
-
-  @Pure
-  public abstract long getCacheExpirationIntervalMillis();
-
-  @Pure
-  public abstract boolean getIOManifestCacheEnabled();
-
-  @Pure
-  public abstract long getIOManifestCacheExpirationIntervalMillis();
-
-  @Pure
-  public abstract long getIOManifestCacheMaxTotalBytes();
-
-  @Pure
-  public abstract long getIOManifestCacheMaxContentLength();
-
-  @Pure
-  public abstract @Nullable String getUri();
-
-  @Pure
-  public abstract int getClientPoolSize();
-
-  @Pure
-  public abstract long getClientPoolEvictionIntervalMs();
-
-  @Pure
-  public abstract @Nullable String getClientPoolCacheKeys();
-
-  @Pure
-  public abstract @Nullable String getLockImplementation();
-
-  @Pure
-  public abstract long getLockHeartbeatIntervalMillis();
-
-  @Pure
-  public abstract long getLockHeartbeatTimeoutMillis();
-
-  @Pure
-  public abstract int getLockHeartbeatThreads();
-
-  @Pure
-  public abstract long getLockAcquireIntervalMillis();
-
-  @Pure
-  public abstract long getLockAcquireTimeoutMillis();
-
-  @Pure
-  public abstract @Nullable String getAppIdentifier();
-
-  @Pure
-  public abstract @Nullable String getUser();
-
   @Pure
-  public abstract long getAuthSessionTimeoutMillis();
+  public abstract String getCatalogName();
 
   @Pure
-  public abstract @Nullable Configuration getConfiguration();
+  public abstract Properties getProperties();
 
   @Pure
   public static Builder builder() {
-    return new AutoValue_IcebergCatalogConfig.Builder()
-        .setIcebergCatalogType(null)
-        .setCatalogImplementation(null)
-        .setFileIOImplementation(null)
-        .setWarehouseLocation(null)
-        .setMetricsReporterImplementation(null) // TODO: Set this to our 
implementation
-        .setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT)
-        .setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT)
-        
.setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
-        
.setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT)
-        .setIOManifestCacheExpirationIntervalMillis(
-            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
-        .setIOManifestCacheMaxTotalBytes(
-            CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
-        .setIOManifestCacheMaxContentLength(
-            CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
-        .setUri(null)
-        .setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT)
-        .setClientPoolEvictionIntervalMs(
-            CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)
-        .setClientPoolCacheKeys(null)
-        .setLockImplementation(null)
-        
.setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT)
-        
.setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
-        
.setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT)
-        
.setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT)
-        
.setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
-        .setAppIdentifier(null)
-        .setUser(null)
-        
.setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT)
-        .setConfiguration(null);
-  }
-
-  @Pure
-  public ImmutableMap<String, String> properties() {
-    return new PropertyBuilder()
-        .put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType())
-        .put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation())
-        .put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation())
-        .put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation())
-        .put(CatalogProperties.METRICS_REPORTER_IMPL, 
getMetricsReporterImplementation())
-        .put(CatalogProperties.CACHE_ENABLED, getCacheEnabled())
-        .put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive())
-        .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, 
getCacheExpirationIntervalMillis())
-        .build();
+    return new AutoValue_IcebergCatalogConfig.Builder();
   }
 
   public org.apache.iceberg.catalog.Catalog catalog() {
-    Configuration conf = getConfiguration();
-    if (conf == null) {
-      conf = new Configuration();
-    }
-    return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf);
+    return CatalogUtil.buildIcebergCatalog(
+        getCatalogName(), Maps.fromProperties(getProperties()), new 
Configuration());
   }
 
   @AutoValue.Builder
   public abstract static class Builder {
+    public abstract Builder setCatalogName(String catalogName);
 
-    /* Core Properties */
-    public abstract Builder setName(String name);
-
-    public abstract Builder setIcebergCatalogType(@Nullable String 
icebergType);
-
-    public abstract Builder setCatalogImplementation(@Nullable String 
catalogImpl);
-
-    public abstract Builder setFileIOImplementation(@Nullable String 
fileIOImpl);
-
-    public abstract Builder setWarehouseLocation(@Nullable String warehouse);
-
-    public abstract Builder setMetricsReporterImplementation(@Nullable String 
metricsImpl);
-
-    /* Caching */
-    public abstract Builder setCacheEnabled(boolean cacheEnabled);
-
-    public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive);
-
-    public abstract Builder setCacheExpirationIntervalMillis(long expiration);
-
-    public abstract Builder setIOManifestCacheEnabled(boolean enabled);
-
-    public abstract Builder setIOManifestCacheExpirationIntervalMillis(long 
expiration);
-
-    public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes);
-
-    public abstract Builder setIOManifestCacheMaxContentLength(long length);
-
-    public abstract Builder setUri(@Nullable String uri);
-
-    public abstract Builder setClientPoolSize(int size);
-
-    public abstract Builder setClientPoolEvictionIntervalMs(long interval);
-
-    public abstract Builder setClientPoolCacheKeys(@Nullable String keys);
-
-    public abstract Builder setLockImplementation(@Nullable String 
lockImplementation);
-
-    public abstract Builder setLockHeartbeatIntervalMillis(long interval);
-
-    public abstract Builder setLockHeartbeatTimeoutMillis(long timeout);
-
-    public abstract Builder setLockHeartbeatThreads(int threads);
-
-    public abstract Builder setLockAcquireIntervalMillis(long interval);
-
-    public abstract Builder setLockAcquireTimeoutMillis(long timeout);
-
-    public abstract Builder setAppIdentifier(@Nullable String id);
-
-    public abstract Builder setUser(@Nullable String user);
-
-    public abstract Builder setAuthSessionTimeoutMillis(long timeout);
-
-    public abstract Builder setConfiguration(@Nullable Configuration conf);
+    public abstract Builder setProperties(Properties props);
 
     public abstract IcebergCatalogConfig build();
   }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index fb32e18d937..ef535353efd 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -21,19 +21,21 @@ import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
 import org.apache.beam.sdk.managed.ManagedTransformConstants;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.iceberg.catalog.TableIdentifier;
 
 /**
@@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
 
   @Override
   protected SchemaTransform from(Config configuration) {
-    configuration.validate();
     return new IcebergReadSchemaTransform(configuration);
   }
 
@@ -68,21 +69,24 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
       return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
     }
 
+    @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
     public abstract String getTable();
 
-    public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
+    @SchemaFieldDescription("Name of the catalog containing the table.")
+    public abstract String getCatalogName();
+
+    @SchemaFieldDescription("Configuration properties used to set up the 
Iceberg catalog.")
+    public abstract Map<String, String> getCatalogProperties();
 
     @AutoValue.Builder
     public abstract static class Builder {
-      public abstract Builder setTable(String tables);
+      public abstract Builder setTable(String table);
 
-      public abstract Builder 
setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
+      public abstract Builder setCatalogName(String catalogName);
 
-      public abstract Config build();
-    }
+      public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
 
-    public void validate() {
-      getCatalogConfig().validate();
+      public abstract Config build();
     }
   }
 
@@ -109,17 +113,13 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      IcebergSchemaTransformCatalogConfig catalogConfig = 
configuration.getCatalogConfig();
+      Properties properties = new Properties();
+      properties.putAll(configuration.getCatalogProperties());
 
       IcebergCatalogConfig.Builder catalogBuilder =
-          
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());
-
-      if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
-        catalogBuilder = 
catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
-      }
-      if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
-        catalogBuilder = 
catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
-      }
+          IcebergCatalogConfig.builder()
+              .setCatalogName(configuration.getCatalogName())
+              .setProperties(properties);
 
       PCollection<Row> output =
           input
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
deleted file mode 100644
index 87b3d204198..00000000000
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.value.AutoValue;
-import java.util.Set;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.NoSuchSchemaException;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
-import org.apache.beam.sdk.util.Preconditions;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import org.apache.iceberg.CatalogUtil;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-@DefaultSchema(AutoValueSchema.class)
-@AutoValue
-public abstract class IcebergSchemaTransformCatalogConfig {
-  public static Builder builder() {
-    return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder();
-  }
-
-  public abstract String getCatalogName();
-
-  @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}")
-  public abstract @Nullable String getCatalogType();
-
-  public abstract @Nullable String getCatalogImplementation();
-
-  public abstract @Nullable String getWarehouseLocation();
-
-  @AutoValue.Builder
-  public abstract static class Builder {
-
-    public abstract Builder setCatalogName(String catalogName);
-
-    public abstract Builder setCatalogType(String catalogType);
-
-    public abstract Builder setCatalogImplementation(String 
catalogImplementation);
-
-    public abstract Builder setWarehouseLocation(String warehouseLocation);
-
-    public abstract IcebergSchemaTransformCatalogConfig build();
-  }
-
-  public static final Schema SCHEMA;
-
-  static {
-    try {
-      // To stay consistent with our SchemaTransform configuration naming 
conventions,
-      // we sort lexicographically and convert field names to snake_case
-      SCHEMA =
-          SchemaRegistry.createDefault()
-              .getSchema(IcebergSchemaTransformCatalogConfig.class)
-              .sorted()
-              .toSnakeCase();
-    } catch (NoSuchSchemaException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @SuppressWarnings("argument")
-  public Row toRow() {
-    return Row.withSchema(SCHEMA)
-        .withFieldValue("catalog_name", getCatalogName())
-        .withFieldValue("catalog_type", getCatalogType())
-        .withFieldValue("catalog_implementation", getCatalogImplementation())
-        .withFieldValue("warehouse_location", getWarehouseLocation())
-        .build();
-  }
-
-  public static final Set<String> VALID_CATALOG_TYPES =
-      Sets.newHashSet(
-          CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
-          CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
-          CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
-
-  public void validate() {
-    if (!Strings.isNullOrEmpty(getCatalogType())) {
-      checkArgument(
-          
VALID_CATALOG_TYPES.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
-          "Invalid catalog type. Please pick one of %s",
-          VALID_CATALOG_TYPES);
-    }
-  }
-}
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b490693a9ad..b3de7a88c54 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -21,6 +21,8 @@ import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
 import org.apache.beam.sdk.managed.ManagedTransformConstants;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
@@ -39,7 +41,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.iceberg.catalog.TableIdentifier;
 
 /**
@@ -64,7 +65,6 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
 
   @Override
   protected SchemaTransform from(Config configuration) {
-    configuration.validate();
     return new IcebergWriteSchemaTransform(configuration);
   }
 
@@ -93,20 +93,21 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
     @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
     public abstract String getTable();
 
-    @SchemaFieldDescription("Configuration parameters used to set up the 
Iceberg catalog.")
-    public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
+    @SchemaFieldDescription("Name of the catalog containing the table.")
+    public abstract String getCatalogName();
+
+    @SchemaFieldDescription("Configuration properties used to set up the 
Iceberg catalog.")
+    public abstract Map<String, String> getCatalogProperties();
 
     @AutoValue.Builder
     public abstract static class Builder {
-      public abstract Builder setTable(String tables);
+      public abstract Builder setTable(String table);
 
-      public abstract Builder 
setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
+      public abstract Builder setCatalogName(String catalogName);
 
-      public abstract Config build();
-    }
+      public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
 
-    public void validate() {
-      getCatalogConfig().validate();
+      public abstract Config build();
     }
   }
 
@@ -133,26 +134,21 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-
       PCollection<Row> rows = input.get(INPUT_TAG);
 
-      IcebergSchemaTransformCatalogConfig catalogConfig = 
configuration.getCatalogConfig();
-
-      IcebergCatalogConfig.Builder catalogBuilder =
-          
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());
+      Properties properties = new Properties();
+      properties.putAll(configuration.getCatalogProperties());
 
-      if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
-        catalogBuilder = 
catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
-      }
-      if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
-        catalogBuilder = 
catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
-      }
+      IcebergCatalogConfig catalog =
+          IcebergCatalogConfig.builder()
+              .setCatalogName(configuration.getCatalogName())
+              .setProperties(properties)
+              .build();
 
       // TODO: support dynamic destinations
       IcebergWriteResult result =
           rows.apply(
-              IcebergIO.writeRows(catalogBuilder.build())
-                  .to(TableIdentifier.parse(configuration.getTable())));
+              
IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable())));
 
       PCollection<Row> snapshots =
           result
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 467a2cbaf24..0420e2f5779 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -206,12 +206,12 @@ public class IcebergIOIT implements Serializable {
     Map<String, Object> config =
         ImmutableMap.<String, Object>builder()
             .put("table", tableId.toString())
+            .put("catalog_name", "test-name")
             .put(
-                "catalog_config",
+                "catalog_properties",
                 ImmutableMap.<String, String>builder()
-                    .put("catalog_name", "hadoop")
-                    .put("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                    .put("warehouse_location", warehouseLocation)
+                    .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+                    .put("warehouse", warehouseLocation)
                     .build())
             .build();
 
@@ -246,12 +246,12 @@ public class IcebergIOIT implements Serializable {
     Map<String, Object> config =
         ImmutableMap.<String, Object>builder()
             .put("table", tableId.toString())
+            .put("catalog_name", "test-name")
             .put(
-                "catalog_config",
+                "catalog_properties",
                 ImmutableMap.<String, String>builder()
-                    .put("catalog_name", "hadoop")
-                    .put("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                    .put("warehouse_location", warehouseLocation)
+                    .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+                    .put("warehouse", warehouseLocation)
                     .build())
             .build();
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index 12d86811e60..d6db3f68911 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
 import java.util.List;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -93,12 +94,12 @@ public class IcebergIOReadTest {
             .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
             .collect(Collectors.toList());
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     IcebergCatalogConfig catalogConfig =
-        IcebergCatalogConfig.builder()
-            .setName("hadoop")
-            .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .setWarehouseLocation(warehouse.location)
-            .build();
+        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
 
     PCollection<Row> output =
         testPipeline
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index e04eaf48cb3..e0a584ec9da 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -75,12 +76,12 @@ public class IcebergIOWriteTest implements Serializable {
     // Create a table and add records to it.
     Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     IcebergCatalogConfig catalog =
-        IcebergCatalogConfig.builder()
-            .setName("hadoop")
-            .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .setWarehouseLocation(warehouse.location)
-            .build();
+        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
 
     testPipeline
         .apply("Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
@@ -109,12 +110,12 @@ public class IcebergIOWriteTest implements Serializable {
     Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
     Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     IcebergCatalogConfig catalog =
-        IcebergCatalogConfig.builder()
-            .setName("hadoop")
-            .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .setWarehouseLocation(warehouse.location)
-            .build();
+        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
 
     DynamicDestinations dynamicDestinations =
         new DynamicDestinations() {
@@ -199,12 +200,12 @@ public class IcebergIOWriteTest implements Serializable {
       elementsPerTable.computeIfAbsent(tableId, ignored -> 
Lists.newArrayList()).add(element);
     }
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     IcebergCatalogConfig catalog =
-        IcebergCatalogConfig.builder()
-            .setName("hadoop")
-            .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .setWarehouseLocation(warehouse.location)
-            .build();
+        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
 
     DynamicDestinations dynamicDestinations =
         new DynamicDestinations() {
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index 46168a487dd..bc15021fa2b 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -52,16 +53,15 @@ public class IcebergReadSchemaTransformProviderTest {
 
   @Test
   public void testBuildTransformWithRow() {
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_name")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("warehouse_location", "test_location")
-            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    properties.put("warehouse", "test_location");
+
     Row transformConfigRow =
         Row.withSchema(new 
IcebergReadSchemaTransformProvider().configurationSchema())
             .withFieldValue("table", "test_table_identifier")
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", properties)
             .build();
 
     new IcebergReadSchemaTransformProvider().from(transformConfigRow);
@@ -97,17 +97,15 @@ public class IcebergReadSchemaTransformProviderTest {
             .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
             .collect(Collectors.toList());
 
-    IcebergSchemaTransformCatalogConfig catalogConfig =
-        IcebergSchemaTransformCatalogConfig.builder()
-            .setCatalogName("hadoop")
-            .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .setWarehouseLocation(warehouse.location)
-            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    properties.put("warehouse", warehouse.location);
 
     IcebergReadSchemaTransformProvider.Config readConfig =
         IcebergReadSchemaTransformProvider.Config.builder()
             .setTable(identifier)
-            .setCatalogConfig(catalogConfig)
+            .setCatalogName("name")
+            .setCatalogProperties(properties)
             .build();
 
     PCollection<Row> output =
@@ -158,10 +156,10 @@ public class IcebergReadSchemaTransformProviderTest {
     String yamlConfig =
         String.format(
             "table: %s\n"
-                + "catalog_config: \n"
-                + "  catalog_name: hadoop\n"
-                + "  catalog_type: %s\n"
-                + "  warehouse_location: %s",
+                + "catalog_name: test-name\n"
+                + "catalog_properties: \n"
+                + "  type: %s\n"
+                + "  warehouse: %s",
             identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, 
warehouse.location);
     Map<String, Object> configMap = new Yaml().load(yamlConfig);
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
index fb4c98cb0bd..7863f7812a1 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
@@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.junit.ClassRule;
@@ -63,18 +66,19 @@ public class IcebergSchemaTransformTranslationTest {
   static final IcebergReadSchemaTransformProvider READ_PROVIDER =
       new IcebergReadSchemaTransformProvider();
 
+  private static final Map<String, String> CATALOG_PROPERTIES =
+      ImmutableMap.<String, String>builder()
+          .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+          .put("warehouse", "test_location")
+          .build();
+
   @Test
   public void testReCreateWriteTransformFromRow() {
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_name")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("warehouse_location", "test_location")
-            .build();
     Row transformConfigRow =
         Row.withSchema(WRITE_PROVIDER.configurationSchema())
             .withFieldValue("table", "test_table_identifier")
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
             .build();
     IcebergWriteSchemaTransform writeTransform =
         (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow);
@@ -101,17 +105,11 @@ public class IcebergSchemaTransformTranslationTest {
                     
Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build())))
             .setRowSchema(inputSchema);
 
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_catalog")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("catalog_implementation", "test_implementation")
-            .withFieldValue("warehouse_location", warehouse.location)
-            .build();
     Row transformConfigRow =
         Row.withSchema(WRITE_PROVIDER.configurationSchema())
             .withFieldValue("table", "test_identifier")
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
             .build();
 
     IcebergWriteSchemaTransform writeTransform =
@@ -158,16 +156,11 @@ public class IcebergSchemaTransformTranslationTest {
   @Test
   public void testReCreateReadTransformFromRow() {
     // setting a subset of fields here.
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_name")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("warehouse_location", "test_location")
-            .build();
     Row transformConfigRow =
         Row.withSchema(READ_PROVIDER.configurationSchema())
             .withFieldValue("table", "test_table_identifier")
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
             .build();
 
     IcebergReadSchemaTransform readTransform =
@@ -188,19 +181,17 @@ public class IcebergSchemaTransformTranslationTest {
       throws InvalidProtocolBufferException, IOException {
     // First build a pipeline
     Pipeline p = Pipeline.create();
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_catalog")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("warehouse_location", warehouse.location)
-            .build();
     String identifier = "default.table_" + 
Long.toString(UUID.randomUUID().hashCode(), 16);
     warehouse.createTable(TableIdentifier.parse(identifier), 
TestFixtures.SCHEMA);
 
+    Map<String, String> properties = new HashMap<>(CATALOG_PROPERTIES);
+    properties.put("warehouse", warehouse.location);
+
     Row transformConfigRow =
         Row.withSchema(READ_PROVIDER.configurationSchema())
             .withFieldValue("table", identifier)
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", properties)
             .build();
 
     IcebergReadSchemaTransform readTransform =
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 9ef3e9945ec..75884f4bcf7 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -61,16 +62,15 @@ public class IcebergWriteSchemaTransformProviderTest {
 
   @Test
   public void testBuildTransformWithRow() {
-    Row catalogConfigRow =
-        Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA)
-            .withFieldValue("catalog_name", "test_name")
-            .withFieldValue("catalog_type", 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-            .withFieldValue("warehouse_location", "test_location")
-            .build();
+    Map<String, String> properties = new HashMap<>();
+    properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    properties.put("warehouse", "test_location");
+
     Row transformConfigRow =
         Row.withSchema(new 
IcebergWriteSchemaTransformProvider().configurationSchema())
             .withFieldValue("table", "test_table_identifier")
-            .withFieldValue("catalog_config", catalogConfigRow)
+            .withFieldValue("catalog_name", "test-name")
+            .withFieldValue("catalog_properties", properties)
             .build();
 
     new IcebergWriteSchemaTransformProvider().from(transformConfigRow);
@@ -85,15 +85,15 @@ public class IcebergWriteSchemaTransformProviderTest {
     // Create a table and add records to it.
     Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
 
+    Map<String, String> properties = new HashMap<>();
+    properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    properties.put("warehouse", warehouse.location);
+
     Config config =
         Config.builder()
             .setTable(identifier)
-            .setCatalogConfig(
-                IcebergSchemaTransformCatalogConfig.builder()
-                    .setCatalogName("hadoop")
-                    .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                    .setWarehouseLocation(warehouse.location)
-                    .build())
+            .setCatalogName("name")
+            .setCatalogProperties(properties)
             .build();
 
     PCollectionRowTuple input =
@@ -127,10 +127,10 @@ public class IcebergWriteSchemaTransformProviderTest {
     String yamlConfig =
         String.format(
             "table: %s\n"
-                + "catalog_config: \n"
-                + "  catalog_name: hadoop\n"
-                + "  catalog_type: %s\n"
-                + "  warehouse_location: %s",
+                + "catalog_name: test-name\n"
+                + "catalog_properties: \n"
+                + "  type: %s\n"
+                + "  warehouse: %s",
             identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, 
warehouse.location);
     Map<String, Object> configMap = new Yaml().load(yamlConfig);
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index c7d5353428c..143687e3c99 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.iceberg;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.util.List;
+import java.util.Properties;
 import java.util.UUID;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -64,14 +65,17 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     BoundedSource<Row> source =
         new ScanSource(
             IcebergScanConfig.builder()
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
-                        .setName("hadoop")
-                        
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                        .setWarehouseLocation(warehouse.location)
+                        .setCatalogName("name")
+                        .setProperties(props)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
@@ -103,14 +107,17 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     BoundedSource<Row> source =
         new ScanSource(
             IcebergScanConfig.builder()
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
-                        .setName("hadoop")
-                        
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                        .setWarehouseLocation(warehouse.location)
+                        .setCatalogName("name")
+                        .setProperties(props)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
@@ -146,14 +153,17 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
+    Properties props = new Properties();
+    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    props.setProperty("warehouse", warehouse.location);
+
     BoundedSource<Row> source =
         new ScanSource(
             IcebergScanConfig.builder()
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
-                        .setName("hadoop")
-                        
.setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
-                        .setWarehouseLocation(warehouse.location)
+                        .setCatalogName("name")
+                        .setProperties(props)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))


Reply via email to