This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b21a84a4cd6 Managed Iceberg hive support and integration tests (#32052)
b21a84a4cd6 is described below

commit b21a84a4cd607f58a3794e274a913ca48da2e42c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Aug 9 17:38:35 2024 -0400

    Managed Iceberg hive support and integration tests (#32052)
    
    * iceberg hive support and integration tests
    
    * split read and write tests; cleanup
    
    * add test documentation
    
    * extend new config_properties arg to translation tests
    
    * revert beam schema override
    
    * actually run hive ITs
    
    * trigger integration tests
    
    * cut down hive database source lines
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 .github/workflows/IO_Iceberg_Integration_Tests.yml |   2 +-
 sdks/java/io/iceberg/build.gradle                  |   5 +
 sdks/java/io/iceberg/hive/build.gradle             |  80 ++++++
 sdks/java/io/iceberg/hive/exec/build.gradle        |  58 +++++
 .../sdk/io/iceberg/hive/IcebergHiveCatalogIT.java  | 280 +++++++++++++++++++++
 .../hive/testutils/HiveMetastoreExtension.java     |  68 +++++
 .../io/iceberg/hive/testutils/ScriptRunner.java    | 203 +++++++++++++++
 .../iceberg/hive/testutils/TestHiveMetastore.java  | 273 ++++++++++++++++++++
 .../src/test/resources/hive-schema-3.1.0.derby.sql | 267 ++++++++++++++++++++
 .../beam/sdk/io/iceberg/IcebergCatalogConfig.java  |  37 ++-
 .../IcebergReadSchemaTransformProvider.java        |  56 +----
 .../IcebergWriteSchemaTransformProvider.java       |  58 +----
 .../io/iceberg/SchemaTransformConfiguration.java   |  69 +++++
 .../beam/sdk/io/iceberg/IcebergIOReadTest.java     |  16 +-
 .../beam/sdk/io/iceberg/IcebergIOWriteTest.java    |  40 ++-
 .../IcebergReadSchemaTransformProviderTest.java    |   4 +-
 .../IcebergSchemaTransformTranslationTest.java     |   6 +
 .../IcebergWriteSchemaTransformProviderTest.java   |   5 +-
 .../apache/beam/sdk/io/iceberg/ScanSourceTest.java |  33 ++-
 settings.gradle.kts                                |   4 +
 21 files changed, 1423 insertions(+), 143 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index bbdc3a3910e..62ae7886c57 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "modification": 3
+    "modification": 4
 }
diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml 
b/.github/workflows/IO_Iceberg_Integration_Tests.yml
index 20d1f4bb60f..22b2b4f9287 100644
--- a/.github/workflows/IO_Iceberg_Integration_Tests.yml
+++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml
@@ -75,4 +75,4 @@ jobs:
       - name: Run IcebergIO Integration Test
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :sdks:java:io:iceberg:integrationTest
\ No newline at end of file
+          gradle-command: :sdks:java:io:iceberg:catalogTests
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index 7965cde86e7..3d653d6b276 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -115,6 +115,11 @@ task integrationTest(type: Test) {
     testClassesDirs = sourceSets.test.output.classesDirs
 }
 
+tasks.register('catalogTests') {
+    dependsOn integrationTest
+    dependsOn ":sdks:java:io:iceberg:hive:integrationTest"
+}
+
 task loadTest(type: Test) {
     def gcpProject = project.findProperty('gcpProject') ?: 
'apache-beam-testing'
     def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 
'gs://temp-storage-for-end-to-end-tests/temp-lt'
diff --git a/sdks/java/io/iceberg/hive/build.gradle 
b/sdks/java/io/iceberg/hive/build.gradle
new file mode 100644
index 00000000000..b81867ec90c
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/build.gradle
@@ -0,0 +1,80 @@
+import groovy.json.JsonOutput
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.iceberg.hive',
+        exportJavadoc: false,
+        shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive"
+ext.summary = "Runtime dependencies needed for Hive catalog integration."
+
+def hive_version = "3.1.3"
+def iceberg_version = "1.4.2"
+
+dependencies {
+    // dependencies needed to run with iceberg's hive catalog
+    runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+    runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", 
configuration: "shadow")
+    runtimeOnly library.java.bigdataoss_gcs_connector
+    runtimeOnly library.java.hadoop_client
+
+    // ----- below dependencies are for testing and will not appear in the 
shaded jar -----
+    // Beam IcebergIO dependencies
+    testImplementation project(path: ":sdks:java:core", configuration: 
"shadow")
+    testImplementation project(":sdks:java:managed")
+    testImplementation project(":sdks:java:io:iceberg")
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
+    testRuntimeOnly library.java.snake_yaml
+
+    // needed to set up the test environment
+    testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
+    testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version"
+    testImplementation "org.assertj:assertj-core:3.11.1"
+    testImplementation library.java.junit
+
+    // needed to set up test Hive Metastore and run tests
+    testImplementation 
("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+    testImplementation project(path: ":sdks:java:io:iceberg:hive:exec", 
configuration: "shadow")
+    testRuntimeOnly 
("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
+        exclude group: "org.apache.hive", module: "hive-exec"
+        exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
+    }
+    testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
+    testImplementation "org.apache.parquet:parquet-column:1.12.0"
+}
+
+task integrationTest(type: Test) {
+    group = "Verification"
+    def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 
'gs://temp-storage-for-end-to-end-tests/iceberg-hive-it'
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+            "--tempLocation=${gcpTempLocation}",
+    ])
+
+    // Disable Gradle cache: these ITs interact with live service that should 
always be considered "out of date"
+    outputs.upToDateWhen { false }
+
+    include '**/*IT.class'
+
+    maxParallelForks 4
+    classpath = sourceSets.test.runtimeClasspath
+    testClassesDirs = sourceSets.test.output.classesDirs
+}
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/hive/exec/build.gradle 
b/sdks/java/io/iceberg/hive/exec/build.gradle
new file mode 100644
index 00000000000..581f71ddedd
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/exec/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+    id 'java'
+    id 'com.github.johnrengelman.shadow'
+}
+
+dependencies {
+    implementation("org.apache.hive:hive-exec:3.1.3")
+    permitUnusedDeclared("org.apache.hive:hive-exec:3.1.3")
+}
+
+configurations {
+    shadow
+}
+
+artifacts {
+    shadow(archives(shadowJar) {
+        builtBy shadowJar
+    })
+}
+
+shadowJar {
+    zip64 true
+
+    // need to shade "com.google.guava" to avoid Guava conflict
+    relocate 'com.google.protobuf', getJavaRelocatedPath('com.google.protobuf')
+    relocate 'shaded.parquet', getJavaRelocatedPath('shaded.parquet')
+    relocate 'org.apache.parquet', getJavaRelocatedPath('org.apache.parquet')
+
+    version "3.1.3"
+    mergeServiceFiles()
+
+    exclude 'LICENSE'
+    exclude(
+            'org/xml/**',
+            'javax/**',
+            'com/sun/**'
+    )
+}
+description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive :: Exec"
+ext.summary = "A copy of the hive-exec dependency with some popular libraries 
relocated."
diff --git 
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
new file mode 100644
index 00000000000..54a4998d37f
--- /dev/null
+++ 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Read and write test for {@link Managed} {@link 
org.apache.beam.sdk.io.iceberg.IcebergIO} using
+ * {@link HiveCatalog}.
+ *
+ * <p>Spins up a local Hive metastore to manage the Iceberg table. Warehouse 
path is set to a GCS
+ * bucket.
+ */
+public class IcebergHiveCatalogIT {
+  private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+      Schema.builder()
+          .addStringField("doubly_nested_str")
+          .addInt64Field("doubly_nested_float")
+          .build();
+
+  private static final Schema NESTED_ROW_SCHEMA =
+      Schema.builder()
+          .addStringField("nested_str")
+          .addInt32Field("nested_int")
+          .addFloatField("nested_float")
+          .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+          .build();
+  private static final Schema BEAM_SCHEMA =
+      Schema.builder()
+          .addStringField("str")
+          .addBooleanField("bool")
+          .addNullableInt32Field("nullable_int")
+          .addNullableInt64Field("nullable_long")
+          .addArrayField("arr_long", Schema.FieldType.INT64)
+          .addRowField("row", NESTED_ROW_SCHEMA)
+          .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+          .build();
+
+  private static final SimpleFunction<Long, Row> ROW_FUNC =
+      new SimpleFunction<Long, Row>() {
+        @Override
+        public Row apply(Long num) {
+          String strNum = Long.toString(num);
+          Row nestedRow =
+              Row.withSchema(NESTED_ROW_SCHEMA)
+                  .addValue("nested_str_value_" + strNum)
+                  .addValue(Integer.valueOf(strNum))
+                  .addValue(Float.valueOf(strNum + "." + strNum))
+                  .addValue(
+                      Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+                          .addValue("doubly_nested_str_value_" + strNum)
+                          .addValue(num)
+                          .build())
+                  .build();
+
+          return Row.withSchema(BEAM_SCHEMA)
+              .addValue("str_value_" + strNum)
+              .addValue(num % 2 == 0)
+              .addValue(Integer.valueOf(strNum))
+              .addValue(num)
+              .addValue(LongStream.range(1, num % 
10).boxed().collect(Collectors.toList()))
+              .addValue(nestedRow)
+              .addValue(num % 2 == 0 ? null : nestedRow)
+              .build();
+        }
+      };
+
+  private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
+      IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+  private static final SimpleFunction<Row, Record> RECORD_FUNC =
+      new SimpleFunction<Row, Record>() {
+        @Override
+        public Record apply(Row input) {
+          return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
+        }
+      };
+
+  private static HiveMetastoreExtension hiveMetastoreExtension;
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  private static final String TEST_CATALOG = "test_catalog";
+  private static final String TEST_TABLE = "test_table";
+  private static HiveCatalog catalog;
+  private static final String TEST_DB = "test_db_" + System.nanoTime();
+
+  @BeforeClass
+  public static void setUp() throws TException {
+    String warehousePath = 
TestPipeline.testingPipelineOptions().getTempLocation();
+    hiveMetastoreExtension = new HiveMetastoreExtension(warehousePath);
+    catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(),
+                TEST_CATALOG,
+                ImmutableMap.of(
+                    CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+                    String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+                hiveMetastoreExtension.hiveConf());
+
+    String dbPath = 
hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
+    Database db = new Database(TEST_DB, "description", dbPath, 
Maps.newHashMap());
+    hiveMetastoreExtension.metastoreClient().createDatabase(db);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    hiveMetastoreExtension.cleanup();
+  }
+
+  private Map<String, Object> getManagedIcebergConfig(TableIdentifier table) {
+    String metastoreUri = 
hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);
+
+    Map<String, String> confProperties =
+        ImmutableMap.<String, String>builder()
+            .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
+            .build();
+
+    return ImmutableMap.<String, Object>builder()
+        .put("table", table.toString())
+        .put("config_properties", confProperties)
+        .build();
+  }
+
+  @Test
+  public void testReadWithHiveCatalog() throws IOException {
+    TableIdentifier tableIdentifier =
+        TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE + 
"_read_test"));
+    Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA);
+
+    List<Row> expectedRows =
+        LongStream.range(1, 
1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
+    List<Record> records =
+        
expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
+
+    // write iceberg records with hive catalog
+    String filepath = table.location() + "/" + UUID.randomUUID();
+    DataWriter<Record> writer =
+        Parquet.writeData(table.io().newOutputFile(filepath))
+            .schema(ICEBERG_SCHEMA)
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .overwrite()
+            .withSpec(table.spec())
+            .build();
+    for (Record rec : records) {
+      writer.write(rec);
+    }
+    writer.close();
+    AppendFiles appendFiles = table.newAppend();
+    String manifestFilename = FileFormat.AVRO.addExtension(filepath + 
".manifest");
+    OutputFile outputFile = table.io().newOutputFile(manifestFilename);
+    ManifestWriter<DataFile> manifestWriter;
+    try (ManifestWriter<DataFile> openWriter = 
ManifestFiles.write(table.spec(), outputFile)) {
+      openWriter.add(writer.toDataFile());
+      manifestWriter = openWriter;
+    }
+    appendFiles.appendManifest(manifestWriter.toManifestFile());
+    appendFiles.commit();
+
+    // Run Managed Iceberg read
+    PCollection<Row> outputRows =
+        readPipeline
+            .apply(
+                
Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)))
+            .getSinglePCollection();
+    PAssert.that(outputRows).containsInAnyOrder(expectedRows);
+    readPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testWriteWithHiveCatalog() {
+    TableIdentifier tableIdentifier =
+        TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE + 
"_write_test"));
+    catalog.createTable(tableIdentifier, 
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));
+
+    List<Row> inputRows =
+        LongStream.range(1, 
1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList());
+    List<Record> expectedRecords =
+        
inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
+
+    // Run Managed Iceberg write
+    writePipeline
+        .apply(Create.of(inputRows))
+        .setRowSchema(BEAM_SCHEMA)
+        
.apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)));
+    writePipeline.run().waitUntilFinish();
+
+    // read back the records and check everything's there
+    Table table = catalog.loadTable(tableIdentifier);
+    TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
+    List<Record> writtenRecords = new ArrayList<>();
+    for (CombinedScanTask task : tableScan.planTasks()) {
+      InputFilesDecryptor decryptor = new InputFilesDecryptor(task, 
table.io(), table.encryption());
+      for (FileScanTask fileTask : task.files()) {
+        InputFile inputFile = decryptor.getInputFile(fileTask);
+        CloseableIterable<Record> iterable =
+            Parquet.read(inputFile)
+                .split(fileTask.start(), fileTask.length())
+                .project(ICEBERG_SCHEMA)
+                .createReaderFunc(
+                    fileSchema -> 
GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
+                .filter(fileTask.residual())
+                .build();
+
+        for (Record rec : iterable) {
+          writtenRecords.add(rec);
+        }
+      }
+    }
+    assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray()));
+  }
+}
diff --git 
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
new file mode 100644
index 00000000000..52de1b91a21
--- /dev/null
+++ 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * A class that interacts with {@link TestHiveMetastore}.
+ *
+ * <p>Trimmed down from <a
+ * 
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java";>Iceberg's
+ * integration testing util</a>
+ */
+public class HiveMetastoreExtension {
+  private HiveMetaStoreClient metastoreClient;
+  private TestHiveMetastore metastore;
+
+  public HiveMetastoreExtension(String warehousePath) throws MetaException {
+    metastore = new TestHiveMetastore(warehousePath);
+    HiveConf hiveConf = new HiveConf(TestHiveMetastore.class);
+
+    metastore.start(hiveConf);
+    metastoreClient = new HiveMetaStoreClient(hiveConf);
+  }
+
+  public void cleanup() throws Exception {
+    if (metastoreClient != null) {
+      metastoreClient.close();
+    }
+
+    if (metastore != null) {
+      metastore.reset();
+      metastore.stop();
+    }
+
+    metastoreClient = null;
+    metastore = null;
+  }
+
+  public HiveMetaStoreClient metastoreClient() {
+    return metastoreClient;
+  }
+
+  public HiveConf hiveConf() {
+    return metastore.hiveConf();
+  }
+
+  public TestHiveMetastore metastore() {
+    return metastore;
+  }
+}
diff --git 
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
new file mode 100644
index 00000000000..adf941e00b4
--- /dev/null
+++ 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Tool to run database scripts.
+ *
+ * <p>Copied over from <a
+ * 
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/ScriptRunner.java";>Iceberg's
+ * integration testing</a>
+ */
+@SuppressWarnings({"OperatorPrecedence", "DefaultCharset"})
+public class ScriptRunner {
+
+  private static final String DEFAULT_DELIMITER = ";";
+
+  private final Connection connection;
+
+  private final boolean stopOnError;
+  private final boolean autoCommit;
+
+  private final PrintWriter logWriter = new PrintWriter(System.out);
+  private final PrintWriter errorLogWriter = new PrintWriter(System.err);
+
+  /** Default constructor. */
+  public ScriptRunner(Connection connection, boolean autoCommit, boolean 
stopOnError) {
+    this.connection = connection;
+    this.autoCommit = autoCommit;
+    this.stopOnError = stopOnError;
+  }
+
+  /**
+   * Runs an SQL script (read in using the Reader parameter).
+   *
+   * @param reader - the source of the script
+   */
+  public void runScript(Reader reader) throws IOException, SQLException {
+    try {
+      boolean originalAutoCommit = connection.getAutoCommit();
+      try {
+        if (originalAutoCommit != this.autoCommit) {
+          connection.setAutoCommit(this.autoCommit);
+        }
+        runScript(connection, reader);
+      } finally {
+        connection.setAutoCommit(originalAutoCommit);
+      }
+    } catch (IOException | SQLException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException("Error running script.  Cause: " + e, e);
+    }
+  }
+
+  /**
+   * Runs an SQL script (read in using the Reader parameter) using the 
connection passed in.
+   *
+   * @param conn - the connection to use for the script
+   * @param reader - the source of the script
+   * @throws SQLException if any SQL errors occur
+   * @throws IOException if there is an error reading from the Reader
+   */
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private void runScript(Connection conn, Reader reader) throws IOException, 
SQLException {
+    StringBuilder command = null;
+    try {
+      LineNumberReader lineReader = new LineNumberReader(reader);
+      String line;
+      while ((line = lineReader.readLine()) != null) {
+        if (command == null) {
+          command = new StringBuilder();
+        }
+        String trimmedLine = line.trim();
+        boolean fullLineDelimiter = false;
+        if (trimmedLine.startsWith("--")) {
+          println(trimmedLine);
+        } else if (trimmedLine.isEmpty() || trimmedLine.startsWith("//")) {
+          // Do nothing
+        } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
+            || fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
+          command.append(line, 0, line.lastIndexOf(getDelimiter()));
+          command.append(" ");
+          Statement statement = conn.createStatement();
+
+          println(command);
+
+          boolean hasResults = false;
+          if (stopOnError) {
+            hasResults = statement.execute(command.toString());
+          } else {
+            try {
+              statement.execute(command.toString());
+            } catch (SQLException e) {
+              e.fillInStackTrace();
+              printlnError("Error executing: " + command);
+              printlnError(e);
+            }
+          }
+
+          if (autoCommit && !conn.getAutoCommit()) {
+            conn.commit();
+          }
+
+          ResultSet rs = statement.getResultSet();
+          if (hasResults && rs != null) {
+            ResultSetMetaData md = rs.getMetaData();
+            int cols = md.getColumnCount();
+            for (int i = 0; i < cols; i++) {
+              String name = md.getColumnLabel(i);
+              print(name + "\t");
+            }
+            println("");
+            while (rs.next()) {
+              for (int i = 0; i < cols; i++) {
+                String value = rs.getString(i);
+                print(value + "\t");
+              }
+              println("");
+            }
+          }
+
+          command = null;
+          try {
+            statement.close();
+          } catch (Exception e) {
+            // Ignore to workaround a bug in Jakarta DBCP
+          }
+          Thread.yield();
+        } else {
+          command.append(line);
+          command.append(" ");
+        }
+      }
+      if (!autoCommit) {
+        conn.commit();
+      }
+    } catch (IOException | SQLException e) {
+      e.fillInStackTrace();
+      printlnError("Error executing: " + command);
+      printlnError(e);
+      throw e;
+    } finally {
+      conn.rollback();
+      flush();
+    }
+  }
+
+  private String getDelimiter() {
+    return DEFAULT_DELIMITER;
+  }
+
+  private void print(Object obj) {
+    if (logWriter != null) {
+      System.out.print(obj);
+    }
+  }
+
+  private void println(Object obj) {
+    if (logWriter != null) {
+      logWriter.println(obj);
+    }
+  }
+
+  private void printlnError(Object obj) {
+    if (errorLogWriter != null) {
+      errorLogWriter.println(obj);
+    }
+  }
+
+  private void flush() {
+    if (logWriter != null) {
+      logWriter.flush();
+    }
+    if (errorLogWriter != null) {
+      errorLogWriter.flush();
+    }
+  }
+}
diff --git 
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
new file mode 100644
index 00000000000..e3af43d58c6
--- /dev/null
+++ 
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
+import static java.nio.file.attribute.PosixFilePermissions.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
+import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.hive.HiveClientPool;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * A Hive Metastore implementation for local testing. Not meant to be used 
directly. Use {@link
+ * HiveMetastoreExtension} instead.
+ *
+ * <p>Copied over from <a
+ * 
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java";>Iceberg's
+ * integration testing util</a>
+ */
+public class TestHiveMetastore {
+
+  private static final String DEFAULT_DATABASE_NAME = "default";
+  private static final int DEFAULT_POOL_SIZE = 5;
+
+  // create the metastore handlers based on whether we're working with Hive2 
or Hive3 dependencies
+  // we need to do this because there is a breaking API change between Hive2 
and Hive3
+  private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler> 
HMS_HANDLER_CTOR =
+      DynConstructors.builder()
+          .impl(HiveMetaStore.HMSHandler.class, String.class, 
Configuration.class)
+          .impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class)
+          .build();
+
+  private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER =
+      DynMethods.builder("getProxy")
+          .impl(RetryingHMSHandler.class, Configuration.class, 
IHMSHandler.class, boolean.class)
+          .impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, 
boolean.class)
+          .buildStatic();
+
+  // Hive3 introduces background metastore tasks (MetastoreTaskThread) for 
performing various
+  // cleanup duties. These
+  // threads are scheduled and executed in a static thread pool
+  // (org.apache.hadoop.hive.metastore.ThreadPool).
+  // This thread pool is shut down normally as part of the JVM shutdown hook, 
but since we're
+  // creating and tearing down
+  // multiple metastore instances within the same JVM, we have to call this 
cleanup method manually,
+  // otherwise
+  // threads from our previous test suite will be stuck in the pool with stale 
config, and keep on
+  // being scheduled.
+  // This can lead to issues, e.g. accidental Persistence Manager closure by
+  // ScheduledQueryExecutionsMaintTask.
+  private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN =
+      DynMethods.builder("shutdown")
+          .impl("org.apache.hadoop.hive.metastore.ThreadPool")
+          .orNoop()
+          .buildStatic();
+
+  // It's tricky to clear all static fields in an HMS instance in order to 
switch derby root dir.
+  // Therefore, we reuse the same derby root between tests and remove it after 
JVM exits.
+  private static final File HIVE_LOCAL_DIR;
+  private static final String DERBY_PATH;
+
+  static {
+    try {
+      HIVE_LOCAL_DIR =
+          createTempDirectory("hive", 
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
+      DERBY_PATH = HIVE_LOCAL_DIR + "/metastore_db";
+      File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
+      System.setProperty("derby.stream.error.file", 
derbyLogFile.getAbsolutePath());
+      setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
+      Runtime.getRuntime()
+          .addShutdownHook(
+              new Thread(
+                  () -> {
+                    Path localDirPath = new 
Path(HIVE_LOCAL_DIR.getAbsolutePath());
+                    FileSystem fs = Util.getFs(localDirPath, new 
Configuration());
+                    String errMsg = "Failed to delete " + localDirPath;
+                    try {
+                      assertThat(fs.delete(localDirPath, 
true)).as(errMsg).isTrue();
+                    } catch (IOException e) {
+                      throw new RuntimeException(errMsg, e);
+                    }
+                  }));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to setup local dir for hive 
metastore", e);
+    }
+  }
+
+  private HiveConf hiveConf;
+  private ExecutorService executorService;
+  private TServer server;
+  private HiveMetaStore.HMSHandler baseHandler;
+  private HiveClientPool clientPool;
+  private final String hiveWarehousePath;
+
+  TestHiveMetastore(String hiveWarehousePath) {
+    this.hiveWarehousePath = hiveWarehousePath;
+  }
+
+  /**
+   * Starts a TestHiveMetastore with the default connection pool size (5) with 
the provided
+   * HiveConf.
+   *
+   * @param conf The hive configuration to use
+   */
+  public void start(HiveConf conf) {
+    start(conf, DEFAULT_POOL_SIZE);
+  }
+
+  /**
+   * Starts a TestHiveMetastore with a provided connection pool size and 
HiveConf.
+   *
+   * @param conf The hive configuration to use
+   * @param poolSize The number of threads in the executor pool
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public void start(HiveConf conf, int poolSize) {
+    try {
+      TServerSocket socket = new TServerSocket(0);
+      int port = socket.getServerSocket().getLocalPort();
+      initConf(conf, port);
+
+      this.hiveConf = conf;
+      this.server = newThriftServer(socket, poolSize, hiveConf);
+      this.executorService = Executors.newSingleThreadExecutor();
+      this.executorService.submit(() -> server.serve());
+      this.clientPool = new HiveClientPool(1, hiveConf);
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot start TestHiveMetastore", e);
+    }
+  }
+
+  public void stop() throws Exception {
+    reset();
+    if (clientPool != null) {
+      clientPool.close();
+    }
+    if (server != null) {
+      server.stop();
+    }
+    if (executorService != null) {
+      executorService.shutdown();
+    }
+    if (baseHandler != null) {
+      baseHandler.shutdown();
+    }
+    METASTORE_THREADS_SHUTDOWN.invoke();
+  }
+
+  public HiveConf hiveConf() {
+    return hiveConf;
+  }
+
+  public String getDatabasePath(String dbName) {
+    return hiveWarehousePath + "/" + dbName + ".db";
+  }
+
+  public void reset() throws Exception {
+    if (clientPool != null) {
+      for (String dbName : clientPool.run(client -> client.getAllDatabases())) 
{
+        for (String tblName : clientPool.run(client -> 
client.getAllTables(dbName))) {
+          clientPool.run(
+              client -> {
+                client.dropTable(dbName, tblName, true, true, true);
+                return null;
+              });
+        }
+
+        if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+          // Drop cascade, functions dropped by cascade
+          clientPool.run(
+              client -> {
+                client.dropDatabase(dbName, true, true, true);
+                return null;
+              });
+        }
+      }
+    }
+
+    Path warehouseRoot = new Path(hiveWarehousePath);
+    FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
+    for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
+      if (!fileStatus.getPath().getName().equals("derby.log")
+          && !fileStatus.getPath().getName().equals("metastore_db")) {
+        fs.delete(fileStatus.getPath(), true);
+      }
+    }
+  }
+
+  private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf 
conf)
+      throws Exception {
+    HiveConf serverConf = new HiveConf(conf);
+    serverConf.set(
+        HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
+        "jdbc:derby:" + DERBY_PATH + ";create=true");
+    baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", 
serverConf);
+    IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, 
false);
+
+    TThreadPoolServer.Args args =
+        new TThreadPoolServer.Args(socket)
+            .processor(new TSetIpAddressProcessor<>(handler))
+            .transportFactory(new TTransportFactory())
+            .protocolFactory(new TBinaryProtocol.Factory())
+            .minWorkerThreads(poolSize)
+            .maxWorkerThreads(poolSize);
+
+    return new TThreadPoolServer(args);
+  }
+
+  private void initConf(HiveConf conf, int port) {
+    conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + 
port);
+    conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, hiveWarehousePath);
+    conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
+    
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
 "false");
+    conf.set("iceberg.hive.client-pool-size", "2");
+    // Setting this to avoid thrift exception during running Iceberg tests 
outside Iceberg.
+    conf.set(
+        HiveConf.ConfVars.HIVE_IN_TEST.varname, 
HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
+  }
+
+  private static void setupMetastoreDB(String dbURL) throws SQLException, 
IOException {
+    Connection connection = DriverManager.getConnection(dbURL);
+    ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
+
+    ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+    InputStream inputStream = 
classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
+    try (Reader reader = new InputStreamReader(inputStream, 
StandardCharsets.UTF_8)) {
+      scriptRunner.runScript(reader);
+    }
+  }
+}
diff --git 
a/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql 
b/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
new file mode 100644
index 00000000000..808c6058576
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
@@ -0,0 +1,267 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- This file was copied from Apache Hive, at:
+-- 
https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-3.1.0.derby.sql
+--
+-- This has been modified slightly for compatibility with older Hive versions.
+--
+-- Timestamp: 2011-09-22 15:32:02.024
+-- Source database is: 
/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Connection URL is: 
jdbc:derby:/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Specified schema is: APP
+-- appendLogs: false
+
+-- ----------------------------------------------
+-- DDL Statements for functions
+-- ----------------------------------------------
+
+CREATE FUNCTION "APP"."NUCLEUS_ASCII" (C CHAR(1)) RETURNS INTEGER LANGUAGE 
JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 
'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.ascii' ;
+
+CREATE FUNCTION "APP"."NUCLEUS_MATCHES" (TEXT VARCHAR(8000),PATTERN 
VARCHAR(8000)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL 
DATA CALLED ON NULL INPUT EXTERNAL NAME 
'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.matches' ;
+
+-- ----------------------------------------------
+-- DDL Statements for tables
+-- ----------------------------------------------
+CREATE TABLE "APP"."DBS" (
+  "DB_ID" BIGINT NOT NULL,
+  "DESC" VARCHAR(4000),
+  "DB_LOCATION_URI" VARCHAR(4000) NOT NULL,
+  "NAME" VARCHAR(128),
+  "OWNER_NAME" VARCHAR(128),
+  "OWNER_TYPE" VARCHAR(10),
+  "CTLG_NAME" VARCHAR(256)
+);
+
+CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY" 
VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY" 
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT" 
VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB, 
"INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME" 
VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."CDS" ("CD_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SERDES" ("SERDE_ID" BIGINT NOT NULL, "NAME" VARCHAR(128), 
"SLIB" VARCHAR(4000), "DESCRIPTION" VARCHAR(4000), "SERIALIZER_CLASS" 
VARCHAR(4000), "DESERIALIZER_CLASS" VARCHAR(4000), SERDE_TYPE INTEGER);
+
+CREATE TABLE "APP"."ROLE_MAP" ("ROLE_GRANT_ID" BIGINT NOT NULL, "ADD_TIME" 
INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128), 
"GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE" 
VARCHAR(128), "ROLE_ID" BIGINT);
+
+CREATE TABLE "APP"."GLOBAL_PRIVS" ("USER_GRANT_ID" BIGINT NOT NULL, 
"CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" 
VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), 
"PRINCIPAL_TYPE" VARCHAR(128), "USER_PRIV" VARCHAR(128), "AUTHORIZER" 
VARCHAR(128));
+
+CREATE TABLE "APP"."ROLES" ("ROLE_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER 
NOT NULL, "OWNER_NAME" VARCHAR(128), "ROLE_NAME" VARCHAR(128));
+
+CREATE TABLE "APP"."TBLS" ("TBL_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT 
NULL, "DB_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "OWNER" 
VARCHAR(767), "OWNER_TYPE" VARCHAR(10), "RETENTION" INTEGER NOT NULL, "SD_ID" 
BIGINT, "TBL_NAME" VARCHAR(256), "TBL_TYPE" VARCHAR(128), "VIEW_EXPANDED_TEXT" 
LONG VARCHAR, "VIEW_ORIGINAL_TEXT" LONG VARCHAR, "IS_REWRITE_ENABLED" CHAR(1) 
NOT NULL DEFAULT 'N');
+
+CREATE TABLE "APP"."PARTITION_KEYS" ("TBL_ID" BIGINT NOT NULL, "PKEY_COMMENT" 
VARCHAR(4000), "PKEY_NAME" VARCHAR(128) NOT NULL, "PKEY_TYPE" VARCHAR(767) NOT 
NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, "INPUT_FORMAT" 
VARCHAR(4000), "IS_COMPRESSED" CHAR(1) NOT NULL, "LOCATION" VARCHAR(4000), 
"NUM_BUCKETS" INTEGER NOT NULL, "OUTPUT_FORMAT" VARCHAR(4000), "SERDE_ID" 
BIGINT, "CD_ID" BIGINT, "IS_STOREDASSUBDIRECTORIES" CHAR(1) NOT NULL);
+
+CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL, 
"NEXT_VAL" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY" 
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, 
"BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY" 
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST_VALUES" ("STRING_LIST_ID" BIGINT NOT 
NULL, "STRING_LIST_VALUE" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_NAMES" ("SD_ID" BIGINT NOT NULL, 
"SKEWED_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ("SD_ID" BIGINT NOT NULL, 
"STRING_LIST_ID_KID" BIGINT NOT NULL, "LOCATION" VARCHAR(4000));
+
+CREATE TABLE "APP"."SKEWED_VALUES" ("SD_ID_OID" BIGINT NOT NULL, 
"STRING_LIST_ID_EID" BIGINT NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."VERSION" ("VER_ID" BIGINT NOT NULL, "SCHEMA_VERSION" 
VARCHAR(127) NOT NULL, "VERSION_COMMENT" VARCHAR(255));
+
+CREATE TABLE "APP"."CTLGS" (
+    "CTLG_ID" BIGINT NOT NULL,
+    "NAME" VARCHAR(256) UNIQUE,
+    "DESC" VARCHAR(4000),
+    "LOCATION_URI" VARCHAR(4000) NOT NULL);
+
+-- ----------------------------------------------
+-- DML Statements
+-- ----------------------------------------------
+
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM 
(VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) 
tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE" 
WHERE "SEQUENCE_NAME" = 
'org.apache.hadoop.hive.metastore.model.MNotificationLog');
+
+-- ----------------------------------------------
+-- DDL Statements for indexes
+-- ----------------------------------------------
+
+
+CREATE UNIQUE INDEX "APP"."ROLEENTITYINDEX" ON "APP"."ROLES" ("ROLE_NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_DATABASE" ON "APP"."DBS" ("NAME", 
"CTLG_NAME");
+
+CREATE UNIQUE INDEX "APP"."USERROLEMAPINDEX" ON "APP"."ROLE_MAP" 
("PRINCIPAL_NAME", "ROLE_ID", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."GLOBALPRIVILEGEINDEX" ON "APP"."GLOBAL_PRIVS" 
("AUTHORIZER", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "USER_PRIV", "GRANTOR", 
"GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_CATALOG" ON "APP"."CTLGS" ("NAME");
+
+
+-- ----------------------------------------------
+-- DDL Statements for keys
+-- ----------------------------------------------
+
+-- primary/unique
+ALTER TABLE "APP"."CDS" ADD CONSTRAINT "SQL110922153006460" PRIMARY KEY 
("CD_ID");
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEY_PK" PRIMARY 
KEY ("TBL_ID", "PKEY_NAME");
+
+ALTER TABLE "APP"."SEQUENCE_TABLE" ADD CONSTRAINT "SEQUENCE_TABLE_PK" PRIMARY 
KEY ("SEQUENCE_NAME");
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_PK" PRIMARY KEY ("SD_ID");
+
+ALTER TABLE "APP"."SERDES" ADD CONSTRAINT "SERDES_PK" PRIMARY KEY ("SERDE_ID");
+
+ALTER TABLE "APP"."ROLES" ADD CONSTRAINT "ROLES_PK" PRIMARY KEY ("ROLE_ID");
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_PK" PRIMARY KEY 
("SERDE_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_PK" PRIMARY KEY ("TBL_ID");
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_PK" PRIMARY KEY 
("SD_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_PK" 
PRIMARY KEY ("DB_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_PK" PRIMARY KEY ("DB_ID");
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_PK" PRIMARY KEY 
("ROLE_GRANT_ID");
+
+ALTER TABLE "APP"."GLOBAL_PRIVS" ADD CONSTRAINT "GLOBAL_PRIVS_PK" PRIMARY KEY 
("USER_GRANT_ID");
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_PK" PRIMARY 
KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_PK" PRIMARY KEY 
("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "SQL110922153006740" PRIMARY KEY 
("CD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_PK" PRIMARY KEY 
("TBL_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST" ADD CONSTRAINT "SKEWED_STRING_LIST_PK" 
PRIMARY KEY ("STRING_LIST_ID");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT 
"SKEWED_STRING_LIST_VALUES_PK" PRIMARY KEY ("STRING_LIST_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_PK" 
PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT 
"SKEWED_COL_VALUE_LOC_MAP_PK" PRIMARY KEY ("SD_ID", "STRING_LIST_ID_KID");
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_PK" PRIMARY 
KEY ("SD_ID_OID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."CTLGS" ADD CONSTRAINT "CTLG_PK" PRIMARY KEY ("CTLG_ID");
+
+-- foreign
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEYS_FK1" FOREIGN 
KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE 
NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK1" FOREIGN KEY ("SERDE_ID") 
REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK2" FOREIGN KEY ("CD_ID") 
REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_FK1" FOREIGN KEY 
("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON 
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK2" FOREIGN KEY ("SD_ID") 
REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK1" FOREIGN KEY ("DB_ID") 
REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_FK1" FOREIGN KEY ("CTLG_NAME") 
REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_FK1" FOREIGN KEY 
("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_FK1" 
FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON 
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_FK1" FOREIGN KEY 
("ROLE_ID") REFERENCES "APP"."ROLES" ("ROLE_ID") ON DELETE NO ACTION ON UPDATE 
NO ACTION;
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_FK1" FOREIGN 
KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_FK1" FOREIGN KEY 
("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "COLUMNS_V2_FK1" FOREIGN KEY 
("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_FK1" FOREIGN KEY 
("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT 
"SKEWED_STRING_LIST_VALUES_FK1" FOREIGN KEY ("STRING_LIST_ID") REFERENCES 
"APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_FK1" 
FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON 
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT 
"SKEWED_COL_VALUE_LOC_MAP_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" 
("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT 
"SKEWED_COL_VALUE_LOC_MAP_FK2" FOREIGN KEY ("STRING_LIST_ID_KID") REFERENCES 
"APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO 
ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK1" FOREIGN 
KEY ("SD_ID_OID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON 
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK2" FOREIGN 
KEY ("STRING_LIST_ID_EID") REFERENCES "APP"."SKEWED_STRING_LIST" 
("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."VERSION" ADD CONSTRAINT "VERSION_PK" PRIMARY KEY ("VER_ID");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_CTLG_FK" FOREIGN KEY ("CTLG_NAME") 
REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+-- ----------------------------------------------
+-- DDL Statements for checks
+-- ----------------------------------------------
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK 
(IS_COMPRESSED IN ('Y','N'));
+
+-- ----------------------------
+-- Transaction and Lock Tables
+-- ----------------------------
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID bigint NOT NULL,
+  HL_LOCK_INT_ID bigint NOT NULL,
+  HL_TXNID bigint NOT NULL,
+  HL_DB varchar(128) NOT NULL,
+  HL_TABLE varchar(128),
+  HL_PARTITION varchar(767),
+  HL_LOCK_STATE char(1) NOT NULL,
+  HL_LOCK_TYPE char(1) NOT NULL,
+  HL_LAST_HEARTBEAT bigint NOT NULL,
+  HL_ACQUIRED_AT bigint,
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT integer,
+  HL_AGENT_INFO varchar(128),
+  HL_BLOCKEDBY_EXT_ID bigint,
+  HL_BLOCKEDBY_INT_ID bigint,
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar(128) NOT NULL,
+  MT_KEY2 bigint NOT NULL,
+  MT_COMMENT varchar(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare 
such PK
+--This is a good candidate for Index orgainzed table
+
+-- -----------------------------------------------------------------
+-- Record schema version. Should be the last step in the init script
+-- -----------------------------------------------------------------
+INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES 
(1, '3.1.0', 'Hive release version 3.1.0');
\ No newline at end of file
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index 2956d75a266..5307047354b 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,19 +19,27 @@ package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
-import java.util.Properties;
+import java.util.Map;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.checkerframework.dataflow.qual.Pure;
 
 @AutoValue
 public abstract class IcebergCatalogConfig implements Serializable {
   @Pure
+  @Nullable
   public abstract String getCatalogName();
 
   @Pure
-  public abstract Properties getProperties();
+  @Nullable
+  public abstract Map<String, String> getCatalogProperties();
+
+  @Pure
+  @Nullable
+  public abstract Map<String, String> getConfigProperties();
 
   @Pure
   public static Builder builder() {
@@ -39,15 +47,32 @@ public abstract class IcebergCatalogConfig implements 
Serializable {
   }
 
   public org.apache.iceberg.catalog.Catalog catalog() {
-    return CatalogUtil.buildIcebergCatalog(
-        getCatalogName(), Maps.fromProperties(getProperties()), new 
Configuration());
+    String catalogName = getCatalogName();
+    if (catalogName == null) {
+      catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
+    }
+    Map<String, String> catalogProps = getCatalogProperties();
+    if (catalogProps == null) {
+      catalogProps = Maps.newHashMap();
+    }
+    Map<String, String> confProps = getConfigProperties();
+    if (confProps == null) {
+      confProps = Maps.newHashMap();
+    }
+    Configuration config = new Configuration();
+    for (Map.Entry<String, String> prop : confProps.entrySet()) {
+      config.set(prop.getKey(), prop.getValue());
+    }
+    return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
   }
 
   @AutoValue.Builder
   public abstract static class Builder {
-    public abstract Builder setCatalogName(String catalogName);
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    public abstract Builder setCatalogProperties(@Nullable Map<String, String> 
props);
 
-    public abstract Builder setProperties(Properties props);
+    public abstract Builder setConfigProperties(@Nullable Map<String, String> 
props);
 
     public abstract IcebergCatalogConfig build();
   }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index ef535353efd..df7bda4560d 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -18,18 +18,11 @@
 package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import 
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
 import org.apache.beam.sdk.managed.ManagedTransformConstants;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -44,11 +37,12 @@ import org.apache.iceberg.catalog.TableIdentifier;
  * org.apache.beam.sdk.values.Row}s.
  */
 @AutoService(SchemaTransformProvider.class)
-public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProvider<Config> {
+public class IcebergReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
   static final String OUTPUT_TAG = "output";
 
   @Override
-  protected SchemaTransform from(Config configuration) {
+  protected SchemaTransform from(SchemaTransformConfiguration configuration) {
     return new IcebergReadSchemaTransform(configuration);
   }
 
@@ -62,38 +56,10 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
     return ManagedTransformConstants.ICEBERG_READ;
   }
 
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class Config {
-    public static Builder builder() {
-      return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
-    }
-
-    @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
-    public abstract String getTable();
-
-    @SchemaFieldDescription("Name of the catalog containing the table.")
-    public abstract String getCatalogName();
-
-    @SchemaFieldDescription("Configuration properties used to set up the 
Iceberg catalog.")
-    public abstract Map<String, String> getCatalogProperties();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setTable(String table);
-
-      public abstract Builder setCatalogName(String catalogName);
-
-      public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
-
-      public abstract Config build();
-    }
-  }
-
   static class IcebergReadSchemaTransform extends SchemaTransform {
-    private final Config configuration;
+    private final SchemaTransformConfiguration configuration;
 
-    IcebergReadSchemaTransform(Config configuration) {
+    IcebergReadSchemaTransform(SchemaTransformConfiguration configuration) {
       this.configuration = configuration;
     }
 
@@ -102,7 +68,7 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
         // To stay consistent with our SchemaTransform configuration naming 
conventions,
         // we sort lexicographically and convert field names to snake_case
         return SchemaRegistry.createDefault()
-            .getToRowFunction(Config.class)
+            .getToRowFunction(SchemaTransformConfiguration.class)
             .apply(configuration)
             .sorted()
             .toSnakeCase();
@@ -113,19 +79,11 @@ public class IcebergReadSchemaTransformProvider extends 
TypedSchemaTransformProv
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      Properties properties = new Properties();
-      properties.putAll(configuration.getCatalogProperties());
-
-      IcebergCatalogConfig.Builder catalogBuilder =
-          IcebergCatalogConfig.builder()
-              .setCatalogName(configuration.getCatalogName())
-              .setProperties(properties);
-
       PCollection<Row> output =
           input
               .getPipeline()
               .apply(
-                  IcebergIO.readRows(catalogBuilder.build())
+                  IcebergIO.readRows(configuration.getIcebergCatalog())
                       .from(TableIdentifier.parse(configuration.getTable())));
 
       return PCollectionRowTuple.of(OUTPUT_TAG, output);
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b3de7a88c54..3f0f88946d9 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -18,19 +18,12 @@
 package org.apache.beam.sdk.io.iceberg;
 
 import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
 import org.apache.beam.sdk.managed.ManagedTransformConstants;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -48,7 +41,8 @@ import org.apache.iceberg.catalog.TableIdentifier;
  * outputs a {@code PCollection<Row>} representing snapshots created in the 
process.
  */
 @AutoService(SchemaTransformProvider.class)
-public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformProvider<Config> {
+public class IcebergWriteSchemaTransformProvider
+    extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
 
   static final String INPUT_TAG = "input";
   static final String OUTPUT_TAG = "output";
@@ -64,7 +58,7 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
   }
 
   @Override
-  protected SchemaTransform from(Config configuration) {
+  protected SchemaTransform from(SchemaTransformConfiguration configuration) {
     return new IcebergWriteSchemaTransform(configuration);
   }
 
@@ -83,38 +77,10 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
     return ManagedTransformConstants.ICEBERG_WRITE;
   }
 
-  @DefaultSchema(AutoValueSchema.class)
-  @AutoValue
-  public abstract static class Config {
-    public static Builder builder() {
-      return new 
AutoValue_IcebergWriteSchemaTransformProvider_Config.Builder();
-    }
-
-    @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
-    public abstract String getTable();
-
-    @SchemaFieldDescription("Name of the catalog containing the table.")
-    public abstract String getCatalogName();
-
-    @SchemaFieldDescription("Configuration properties used to set up the 
Iceberg catalog.")
-    public abstract Map<String, String> getCatalogProperties();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setTable(String table);
-
-      public abstract Builder setCatalogName(String catalogName);
-
-      public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
-
-      public abstract Config build();
-    }
-  }
-
   static class IcebergWriteSchemaTransform extends SchemaTransform {
-    private final Config configuration;
+    private final SchemaTransformConfiguration configuration;
 
-    IcebergWriteSchemaTransform(Config configuration) {
+    IcebergWriteSchemaTransform(SchemaTransformConfiguration configuration) {
       this.configuration = configuration;
     }
 
@@ -123,7 +89,7 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
         // To stay consistent with our SchemaTransform configuration naming 
conventions,
         // we sort lexicographically and convert field names to snake_case
         return SchemaRegistry.createDefault()
-            .getToRowFunction(Config.class)
+            .getToRowFunction(SchemaTransformConfiguration.class)
             .apply(configuration)
             .sorted()
             .toSnakeCase();
@@ -136,19 +102,11 @@ public class IcebergWriteSchemaTransformProvider extends 
TypedSchemaTransformPro
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       PCollection<Row> rows = input.get(INPUT_TAG);
 
-      Properties properties = new Properties();
-      properties.putAll(configuration.getCatalogProperties());
-
-      IcebergCatalogConfig catalog =
-          IcebergCatalogConfig.builder()
-              .setCatalogName(configuration.getCatalogName())
-              .setProperties(properties)
-              .build();
-
       // TODO: support dynamic destinations
       IcebergWriteResult result =
           rows.apply(
-              
IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable())));
+              IcebergIO.writeRows(configuration.getIcebergCatalog())
+                  .to(TableIdentifier.parse(configuration.getTable())));
 
       PCollection<Row> snapshots =
           result
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
new file mode 100644
index 00000000000..6e7a12aa15a
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SchemaTransformConfiguration {
+  public static Builder builder() {
+    return new AutoValue_SchemaTransformConfiguration.Builder();
+  }
+
+  @SchemaFieldDescription("Identifier of the Iceberg table.")
+  public abstract String getTable();
+
+  @SchemaFieldDescription("Name of the catalog containing the table.")
+  @Nullable
+  public abstract String getCatalogName();
+
+  @SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
+  @Nullable
+  public abstract Map<String, String> getCatalogProperties();
+
+  @SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
+  @Nullable
+  public abstract Map<String, String> getConfigProperties();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setTable(String table);
+
+    public abstract Builder setCatalogName(String catalogName);
+
+    public abstract Builder setCatalogProperties(Map<String, String> 
catalogProperties);
+
+    public abstract Builder setConfigProperties(Map<String, String> 
confProperties);
+
+    public abstract SchemaTransformConfiguration build();
+  }
+
+  public IcebergCatalogConfig getIcebergCatalog() {
+    return IcebergCatalogConfig.builder()
+        .setCatalogName(getCatalogName())
+        .setCatalogProperties(getCatalogProperties())
+        .setConfigProperties(getConfigProperties())
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index 3f31073b444..fe4a07dedfd 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -21,7 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -94,12 +95,17 @@ public class IcebergIOReadTest {
             .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
             .collect(Collectors.toList());
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     IcebergCatalogConfig catalogConfig =
-        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+        IcebergCatalogConfig.builder()
+            .setCatalogName("name")
+            .setCatalogProperties(catalogProps)
+            .build();
 
     PCollection<Row> output =
         testPipeline
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index 02213c45e07..2abe6b09348 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -23,7 +23,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -76,12 +75,17 @@ public class IcebergIOWriteTest implements Serializable {
     // Create a table and add records to it.
     Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     IcebergCatalogConfig catalog =
-        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+        IcebergCatalogConfig.builder()
+            .setCatalogName("name")
+            .setCatalogProperties(catalogProps)
+            .build();
 
     testPipeline
         .apply("Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
@@ -110,12 +114,17 @@ public class IcebergIOWriteTest implements Serializable {
     Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
     Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     IcebergCatalogConfig catalog =
-        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+        IcebergCatalogConfig.builder()
+            .setCatalogName("name")
+            .setCatalogProperties(catalogProps)
+            .build();
 
     DynamicDestinations dynamicDestinations =
         new DynamicDestinations() {
@@ -200,12 +209,17 @@ public class IcebergIOWriteTest implements Serializable {
       elementsPerTable.computeIfAbsent(tableId, ignored -> 
Lists.newArrayList()).add(element);
     }
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     IcebergCatalogConfig catalog =
-        
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+        IcebergCatalogConfig.builder()
+            .setCatalogName("name")
+            .setCatalogProperties(catalogProps)
+            .build();
 
     DynamicDestinations dynamicDestinations =
         new DynamicDestinations() {
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index effb5cc4838..0311c31da40 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -101,8 +101,8 @@ public class IcebergReadSchemaTransformProviderTest {
     properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
     properties.put("warehouse", warehouse.location);
 
-    IcebergReadSchemaTransformProvider.Config readConfig =
-        IcebergReadSchemaTransformProvider.Config.builder()
+    SchemaTransformConfiguration readConfig =
+        SchemaTransformConfiguration.builder()
             .setTable(identifier)
             .setCatalogName("name")
             .setCatalogProperties(properties)
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
index 7863f7812a1..86a5e0bcd43 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
@@ -71,6 +71,8 @@ public class IcebergSchemaTransformTranslationTest {
           .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
           .put("warehouse", "test_location")
           .build();
+  private static final Map<String, String> CONFIG_PROPERTIES =
+      ImmutableMap.<String, String>builder().put("key", "value").put("key2", 
"value2").build();
 
   @Test
   public void testReCreateWriteTransformFromRow() {
@@ -79,6 +81,7 @@ public class IcebergSchemaTransformTranslationTest {
             .withFieldValue("table", "test_table_identifier")
             .withFieldValue("catalog_name", "test-name")
             .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+            .withFieldValue("config_properties", CONFIG_PROPERTIES)
             .build();
     IcebergWriteSchemaTransform writeTransform =
         (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow);
@@ -110,6 +113,7 @@ public class IcebergSchemaTransformTranslationTest {
             .withFieldValue("table", "test_identifier")
             .withFieldValue("catalog_name", "test-name")
             .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+            .withFieldValue("config_properties", CONFIG_PROPERTIES)
             .build();
 
     IcebergWriteSchemaTransform writeTransform =
@@ -161,6 +165,7 @@ public class IcebergSchemaTransformTranslationTest {
             .withFieldValue("table", "test_table_identifier")
             .withFieldValue("catalog_name", "test-name")
             .withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+            .withFieldValue("config_properties", CONFIG_PROPERTIES)
             .build();
 
     IcebergReadSchemaTransform readTransform =
@@ -192,6 +197,7 @@ public class IcebergSchemaTransformTranslationTest {
             .withFieldValue("table", identifier)
             .withFieldValue("catalog_name", "test-name")
             .withFieldValue("catalog_properties", properties)
+            .withFieldValue("config_properties", CONFIG_PROPERTIES)
             .build();
 
     IcebergReadSchemaTransform readTransform =
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index a2cd64e2395..6b555e7e14d 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
 import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG;
 import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -89,8 +88,8 @@ public class IcebergWriteSchemaTransformProviderTest {
     properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
     properties.put("warehouse", warehouse.location);
 
-    Config config =
-        Config.builder()
+    SchemaTransformConfiguration config =
+        SchemaTransformConfiguration.builder()
             .setTable(identifier)
             .setCatalogName("name")
             .setCatalogProperties(properties)
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index 007cb028c66..38a15cb2aa9 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -20,13 +20,14 @@ package org.apache.beam.sdk.io.iceberg;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -65,9 +66,11 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     BoundedSource<Row> source =
         new ScanSource(
@@ -75,7 +78,7 @@ public class ScanSourceTest {
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
                         .setCatalogName("name")
-                        .setProperties(props)
+                        .setCatalogProperties(catalogProps)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
@@ -107,9 +110,11 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     BoundedSource<Row> source =
         new ScanSource(
@@ -117,7 +122,7 @@ public class ScanSourceTest {
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
                         .setCatalogName("name")
-                        .setProperties(props)
+                        .setCatalogProperties(catalogProps)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
@@ -153,9 +158,11 @@ public class ScanSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
 
-    Properties props = new Properties();
-    props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    props.setProperty("warehouse", warehouse.location);
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .put("warehouse", warehouse.location)
+            .build();
 
     BoundedSource<Row> source =
         new ScanSource(
@@ -163,7 +170,7 @@ public class ScanSourceTest {
                 .setCatalogConfig(
                     IcebergCatalogConfig.builder()
                         .setCatalogName("name")
-                        .setProperties(props)
+                        .setCatalogProperties(catalogProps)
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4d4b93908a0..65a55885afa 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -365,3 +365,7 @@ include("sdks:java:io:solace")
 findProject(":sdks:java:io:solace")?.name = "solace"
 include("sdks:java:extensions:combiners")
 findProject(":sdks:java:extensions:combiners")?.name = "combiners"
+include("sdks:java:io:iceberg:hive")
+findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
+include("sdks:java:io:iceberg:hive:exec")
+findProject(":sdks:java:io:iceberg:hive:exec")?.name = "exec"


Reply via email to