This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b21a84a4cd6 Managed Iceberg hive support and integration tests (#32052)
b21a84a4cd6 is described below
commit b21a84a4cd607f58a3794e274a913ca48da2e42c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Aug 9 17:38:35 2024 -0400
Managed Iceberg hive support and integration tests (#32052)
* iceberg hive support and integration tests
* split read and write tests; cleanup
* add test documentation
* extend new config_properties arg to translation tests
* revert beam schema override
* actually run hive ITs
* trigger integration tests
* cut down hive database source lines
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
.github/workflows/IO_Iceberg_Integration_Tests.yml | 2 +-
sdks/java/io/iceberg/build.gradle | 5 +
sdks/java/io/iceberg/hive/build.gradle | 80 ++++++
sdks/java/io/iceberg/hive/exec/build.gradle | 58 +++++
.../sdk/io/iceberg/hive/IcebergHiveCatalogIT.java | 280 +++++++++++++++++++++
.../hive/testutils/HiveMetastoreExtension.java | 68 +++++
.../io/iceberg/hive/testutils/ScriptRunner.java | 203 +++++++++++++++
.../iceberg/hive/testutils/TestHiveMetastore.java | 273 ++++++++++++++++++++
.../src/test/resources/hive-schema-3.1.0.derby.sql | 267 ++++++++++++++++++++
.../beam/sdk/io/iceberg/IcebergCatalogConfig.java | 37 ++-
.../IcebergReadSchemaTransformProvider.java | 56 +----
.../IcebergWriteSchemaTransformProvider.java | 58 +----
.../io/iceberg/SchemaTransformConfiguration.java | 69 +++++
.../beam/sdk/io/iceberg/IcebergIOReadTest.java | 16 +-
.../beam/sdk/io/iceberg/IcebergIOWriteTest.java | 40 ++-
.../IcebergReadSchemaTransformProviderTest.java | 4 +-
.../IcebergSchemaTransformTranslationTest.java | 6 +
.../IcebergWriteSchemaTransformProviderTest.java | 5 +-
.../apache/beam/sdk/io/iceberg/ScanSourceTest.java | 33 ++-
settings.gradle.kts | 4 +
21 files changed, 1423 insertions(+), 143 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index bbdc3a3910e..62ae7886c57 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 3
+ "modification": 4
}
diff --git a/.github/workflows/IO_Iceberg_Integration_Tests.yml
b/.github/workflows/IO_Iceberg_Integration_Tests.yml
index 20d1f4bb60f..22b2b4f9287 100644
--- a/.github/workflows/IO_Iceberg_Integration_Tests.yml
+++ b/.github/workflows/IO_Iceberg_Integration_Tests.yml
@@ -75,4 +75,4 @@ jobs:
- name: Run IcebergIO Integration Test
uses: ./.github/actions/gradle-command-self-hosted-action
with:
- gradle-command: :sdks:java:io:iceberg:integrationTest
\ No newline at end of file
+ gradle-command: :sdks:java:io:iceberg:catalogTests
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index 7965cde86e7..3d653d6b276 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -115,6 +115,11 @@ task integrationTest(type: Test) {
testClassesDirs = sourceSets.test.output.classesDirs
}
+tasks.register('catalogTests') {
+ dependsOn integrationTest
+ dependsOn ":sdks:java:io:iceberg:hive:integrationTest"
+}
+
task loadTest(type: Test) {
def gcpProject = project.findProperty('gcpProject') ?:
'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?:
'gs://temp-storage-for-end-to-end-tests/temp-lt'
diff --git a/sdks/java/io/iceberg/hive/build.gradle
b/sdks/java/io/iceberg/hive/build.gradle
new file mode 100644
index 00000000000..b81867ec90c
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/build.gradle
@@ -0,0 +1,80 @@
+import groovy.json.JsonOutput
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.iceberg.hive',
+ exportJavadoc: false,
+ shadowClosure: {},
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive"
+ext.summary = "Runtime dependencies needed for Hive catalog integration."
+
+def hive_version = "3.1.3"
+def iceberg_version = "1.4.2"
+
+dependencies {
+ // dependencies needed to run with iceberg's hive catalog
+ runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+ runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec",
configuration: "shadow")
+ runtimeOnly library.java.bigdataoss_gcs_connector
+ runtimeOnly library.java.hadoop_client
+
+ // ----- below dependencies are for testing and will not appear in the
shaded jar -----
+ // Beam IcebergIO dependencies
+ testImplementation project(path: ":sdks:java:core", configuration:
"shadow")
+ testImplementation project(":sdks:java:managed")
+ testImplementation project(":sdks:java:io:iceberg")
+ testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
+ testRuntimeOnly library.java.snake_yaml
+
+ // needed to set up the test environment
+ testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
+ testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version"
+ testImplementation "org.assertj:assertj-core:3.11.1"
+ testImplementation library.java.junit
+
+ // needed to set up test Hive Metastore and run tests
+ testImplementation
("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
+ testImplementation project(path: ":sdks:java:io:iceberg:hive:exec",
configuration: "shadow")
+ testRuntimeOnly
("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") {
+ exclude group: "org.apache.hive", module: "hive-exec"
+ exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
+ }
+ testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
+ testImplementation "org.apache.parquet:parquet-column:1.12.0"
+}
+
+task integrationTest(type: Test) {
+ group = "Verification"
+ def gcpTempLocation = project.findProperty('gcpTempLocation') ?:
'gs://temp-storage-for-end-to-end-tests/iceberg-hive-it'
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--tempLocation=${gcpTempLocation}",
+ ])
+
+ // Disable Gradle cache: these ITs interact with live service that should
always be considered "out of date"
+ outputs.upToDateWhen { false }
+
+ include '**/*IT.class'
+
+ maxParallelForks 4
+ classpath = sourceSets.test.runtimeClasspath
+ testClassesDirs = sourceSets.test.output.classesDirs
+}
\ No newline at end of file
diff --git a/sdks/java/io/iceberg/hive/exec/build.gradle
b/sdks/java/io/iceberg/hive/exec/build.gradle
new file mode 100644
index 00000000000..581f71ddedd
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/exec/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+ id 'org.apache.beam.module'
+ id 'java'
+ id 'com.github.johnrengelman.shadow'
+}
+
+dependencies {
+ implementation("org.apache.hive:hive-exec:3.1.3")
+ permitUnusedDeclared("org.apache.hive:hive-exec:3.1.3")
+}
+
+configurations {
+ shadow
+}
+
+artifacts {
+ shadow(archives(shadowJar) {
+ builtBy shadowJar
+ })
+}
+
+shadowJar {
+ zip64 true
+
+ // need to shade "com.google.guava" to avoid Guava conflict
+ relocate 'com.google.protobuf', getJavaRelocatedPath('com.google.protobuf')
+ relocate 'shaded.parquet', getJavaRelocatedPath('shaded.parquet')
+ relocate 'org.apache.parquet', getJavaRelocatedPath('org.apache.parquet')
+
+ version "3.1.3"
+ mergeServiceFiles()
+
+ exclude 'LICENSE'
+ exclude(
+ 'org/xml/**',
+ 'javax/**',
+ 'com/sun/**'
+ )
+}
+description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: Hive :: Exec"
+ext.summary = "A copy of the hive-exec dependency with some popular libraries
relocated."
diff --git
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
new file mode 100644
index 00000000000..54a4998d37f
--- /dev/null
+++
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/IcebergHiveCatalogIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.thrift.TException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Read and write test for {@link Managed} {@link
org.apache.beam.sdk.io.iceberg.IcebergIO} using
+ * {@link HiveCatalog}.
+ *
+ * <p>Spins up a local Hive metastore to manage the Iceberg table. Warehouse
path is set to a GCS
+ * bucket.
+ */
+public class IcebergHiveCatalogIT {
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("doubly_nested_str")
+ .addInt64Field("doubly_nested_float")
+ .build();
+
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build();
+ private static final Schema BEAM_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addBooleanField("bool")
+ .addNullableInt32Field("nullable_int")
+ .addNullableInt64Field("nullable_long")
+ .addArrayField("arr_long", Schema.FieldType.INT64)
+ .addRowField("row", NESTED_ROW_SCHEMA)
+ .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+ .build();
+
+ private static final SimpleFunction<Long, Row> ROW_FUNC =
+ new SimpleFunction<Long, Row>() {
+ @Override
+ public Row apply(Long num) {
+ String strNum = Long.toString(num);
+ Row nestedRow =
+ Row.withSchema(NESTED_ROW_SCHEMA)
+ .addValue("nested_str_value_" + strNum)
+ .addValue(Integer.valueOf(strNum))
+ .addValue(Float.valueOf(strNum + "." + strNum))
+ .addValue(
+ Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue("doubly_nested_str_value_" + strNum)
+ .addValue(num)
+ .build())
+ .build();
+
+ return Row.withSchema(BEAM_SCHEMA)
+ .addValue("str_value_" + strNum)
+ .addValue(num % 2 == 0)
+ .addValue(Integer.valueOf(strNum))
+ .addValue(num)
+ .addValue(LongStream.range(1, num %
10).boxed().collect(Collectors.toList()))
+ .addValue(nestedRow)
+ .addValue(num % 2 == 0 ? null : nestedRow)
+ .build();
+ }
+ };
+
+ private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+ private static final SimpleFunction<Row, Record> RECORD_FUNC =
+ new SimpleFunction<Row, Record>() {
+ @Override
+ public Record apply(Row input) {
+ return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
+ }
+ };
+
+ private static HiveMetastoreExtension hiveMetastoreExtension;
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ private static final String TEST_CATALOG = "test_catalog";
+ private static final String TEST_TABLE = "test_table";
+ private static HiveCatalog catalog;
+ private static final String TEST_DB = "test_db_" + System.nanoTime();
+
+ @BeforeClass
+ public static void setUp() throws TException {
+ String warehousePath =
TestPipeline.testingPipelineOptions().getTempLocation();
+ hiveMetastoreExtension = new HiveMetastoreExtension(warehousePath);
+ catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(),
+ TEST_CATALOG,
+ ImmutableMap.of(
+ CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+ String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+ hiveMetastoreExtension.hiveConf());
+
+ String dbPath =
hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
+ Database db = new Database(TEST_DB, "description", dbPath,
Maps.newHashMap());
+ hiveMetastoreExtension.metastoreClient().createDatabase(db);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ hiveMetastoreExtension.cleanup();
+ }
+
+ private Map<String, Object> getManagedIcebergConfig(TableIdentifier table) {
+ String metastoreUri =
hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS);
+
+ Map<String, String> confProperties =
+ ImmutableMap.<String, String>builder()
+ .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
+ .build();
+
+ return ImmutableMap.<String, Object>builder()
+ .put("table", table.toString())
+ .put("config_properties", confProperties)
+ .build();
+ }
+
+ @Test
+ public void testReadWithHiveCatalog() throws IOException {
+ TableIdentifier tableIdentifier =
+ TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE +
"_read_test"));
+ Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA);
+
+ List<Row> expectedRows =
+ LongStream.range(1,
1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
+ List<Record> records =
+
expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
+
+ // write iceberg records with hive catalog
+ String filepath = table.location() + "/" + UUID.randomUUID();
+ DataWriter<Record> writer =
+ Parquet.writeData(table.io().newOutputFile(filepath))
+ .schema(ICEBERG_SCHEMA)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .withSpec(table.spec())
+ .build();
+ for (Record rec : records) {
+ writer.write(rec);
+ }
+ writer.close();
+ AppendFiles appendFiles = table.newAppend();
+ String manifestFilename = FileFormat.AVRO.addExtension(filepath +
".manifest");
+ OutputFile outputFile = table.io().newOutputFile(manifestFilename);
+ ManifestWriter<DataFile> manifestWriter;
+ try (ManifestWriter<DataFile> openWriter =
ManifestFiles.write(table.spec(), outputFile)) {
+ openWriter.add(writer.toDataFile());
+ manifestWriter = openWriter;
+ }
+ appendFiles.appendManifest(manifestWriter.toManifestFile());
+ appendFiles.commit();
+
+ // Run Managed Iceberg read
+ PCollection<Row> outputRows =
+ readPipeline
+ .apply(
+
Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)))
+ .getSinglePCollection();
+ PAssert.that(outputRows).containsInAnyOrder(expectedRows);
+ readPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testWriteWithHiveCatalog() {
+ TableIdentifier tableIdentifier =
+ TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE +
"_write_test"));
+ catalog.createTable(tableIdentifier,
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));
+
+ List<Row> inputRows =
+ LongStream.range(1,
1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList());
+ List<Record> expectedRecords =
+
inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());
+
+ // Run Managed Iceberg write
+ writePipeline
+ .apply(Create.of(inputRows))
+ .setRowSchema(BEAM_SCHEMA)
+
.apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)));
+ writePipeline.run().waitUntilFinish();
+
+ // read back the records and check everything's there
+ Table table = catalog.loadTable(tableIdentifier);
+ TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
+ List<Record> writtenRecords = new ArrayList<>();
+ for (CombinedScanTask task : tableScan.planTasks()) {
+ InputFilesDecryptor decryptor = new InputFilesDecryptor(task,
table.io(), table.encryption());
+ for (FileScanTask fileTask : task.files()) {
+ InputFile inputFile = decryptor.getInputFile(fileTask);
+ CloseableIterable<Record> iterable =
+ Parquet.read(inputFile)
+ .split(fileTask.start(), fileTask.length())
+ .project(ICEBERG_SCHEMA)
+ .createReaderFunc(
+ fileSchema ->
GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
+ .filter(fileTask.residual())
+ .build();
+
+ for (Record rec : iterable) {
+ writtenRecords.add(rec);
+ }
+ }
+ }
+ assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray()));
+ }
+}
diff --git
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
new file mode 100644
index 00000000000..52de1b91a21
--- /dev/null
+++
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/HiveMetastoreExtension.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * A class that interacts with {@link TestHiveMetastore}.
+ *
+ * <p>Trimmed down from <a
+ *
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveMetastoreExtension.java">Iceberg's
+ * integration testing util</a>
+ */
+public class HiveMetastoreExtension {
+ private HiveMetaStoreClient metastoreClient;
+ private TestHiveMetastore metastore;
+
+ public HiveMetastoreExtension(String warehousePath) throws MetaException {
+ metastore = new TestHiveMetastore(warehousePath);
+ HiveConf hiveConf = new HiveConf(TestHiveMetastore.class);
+
+ metastore.start(hiveConf);
+ metastoreClient = new HiveMetaStoreClient(hiveConf);
+ }
+
+ public void cleanup() throws Exception {
+ if (metastoreClient != null) {
+ metastoreClient.close();
+ }
+
+ if (metastore != null) {
+ metastore.reset();
+ metastore.stop();
+ }
+
+ metastoreClient = null;
+ metastore = null;
+ }
+
+ public HiveMetaStoreClient metastoreClient() {
+ return metastoreClient;
+ }
+
+ public HiveConf hiveConf() {
+ return metastore.hiveConf();
+ }
+
+ public TestHiveMetastore metastore() {
+ return metastore;
+ }
+}
diff --git
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
new file mode 100644
index 00000000000..adf941e00b4
--- /dev/null
+++
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/ScriptRunner.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Tool to run database scripts.
+ *
+ * <p>Copied over from <a
+ *
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/ScriptRunner.java">Iceberg's
+ * integration testing</a>
+ */
+@SuppressWarnings({"OperatorPrecedence", "DefaultCharset"})
+public class ScriptRunner {
+
+ private static final String DEFAULT_DELIMITER = ";";
+
+ private final Connection connection;
+
+ private final boolean stopOnError;
+ private final boolean autoCommit;
+
+ private final PrintWriter logWriter = new PrintWriter(System.out);
+ private final PrintWriter errorLogWriter = new PrintWriter(System.err);
+
+ /** Default constructor. */
+ public ScriptRunner(Connection connection, boolean autoCommit, boolean
stopOnError) {
+ this.connection = connection;
+ this.autoCommit = autoCommit;
+ this.stopOnError = stopOnError;
+ }
+
+ /**
+ * Runs an SQL script (read in using the Reader parameter).
+ *
+ * @param reader - the source of the script
+ */
+ public void runScript(Reader reader) throws IOException, SQLException {
+ try {
+ boolean originalAutoCommit = connection.getAutoCommit();
+ try {
+ if (originalAutoCommit != this.autoCommit) {
+ connection.setAutoCommit(this.autoCommit);
+ }
+ runScript(connection, reader);
+ } finally {
+ connection.setAutoCommit(originalAutoCommit);
+ }
+ } catch (IOException | SQLException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Error running script. Cause: " + e, e);
+ }
+ }
+
+ /**
+ * Runs an SQL script (read in using the Reader parameter) using the
connection passed in.
+ *
+ * @param conn - the connection to use for the script
+ * @param reader - the source of the script
+ * @throws SQLException if any SQL errors occur
+ * @throws IOException if there is an error reading from the Reader
+ */
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private void runScript(Connection conn, Reader reader) throws IOException,
SQLException {
+ StringBuilder command = null;
+ try {
+ LineNumberReader lineReader = new LineNumberReader(reader);
+ String line;
+ while ((line = lineReader.readLine()) != null) {
+ if (command == null) {
+ command = new StringBuilder();
+ }
+ String trimmedLine = line.trim();
+ boolean fullLineDelimiter = false;
+ if (trimmedLine.startsWith("--")) {
+ println(trimmedLine);
+ } else if (trimmedLine.isEmpty() || trimmedLine.startsWith("//")) {
+ // Do nothing
+ } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
+ || fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
+ command.append(line, 0, line.lastIndexOf(getDelimiter()));
+ command.append(" ");
+ Statement statement = conn.createStatement();
+
+ println(command);
+
+ boolean hasResults = false;
+ if (stopOnError) {
+ hasResults = statement.execute(command.toString());
+ } else {
+ try {
+ statement.execute(command.toString());
+ } catch (SQLException e) {
+ e.fillInStackTrace();
+ printlnError("Error executing: " + command);
+ printlnError(e);
+ }
+ }
+
+ if (autoCommit && !conn.getAutoCommit()) {
+ conn.commit();
+ }
+
+ ResultSet rs = statement.getResultSet();
+ if (hasResults && rs != null) {
+ ResultSetMetaData md = rs.getMetaData();
+ int cols = md.getColumnCount();
+ for (int i = 0; i < cols; i++) {
+ String name = md.getColumnLabel(i);
+ print(name + "\t");
+ }
+ println("");
+ while (rs.next()) {
+ for (int i = 0; i < cols; i++) {
+ String value = rs.getString(i);
+ print(value + "\t");
+ }
+ println("");
+ }
+ }
+
+ command = null;
+ try {
+ statement.close();
+ } catch (Exception e) {
+ // Ignore to workaround a bug in Jakarta DBCP
+ }
+ Thread.yield();
+ } else {
+ command.append(line);
+ command.append(" ");
+ }
+ }
+ if (!autoCommit) {
+ conn.commit();
+ }
+ } catch (IOException | SQLException e) {
+ e.fillInStackTrace();
+ printlnError("Error executing: " + command);
+ printlnError(e);
+ throw e;
+ } finally {
+ conn.rollback();
+ flush();
+ }
+ }
+
+ private String getDelimiter() {
+ return DEFAULT_DELIMITER;
+ }
+
+ private void print(Object obj) {
+ if (logWriter != null) {
+ System.out.print(obj);
+ }
+ }
+
+ private void println(Object obj) {
+ if (logWriter != null) {
+ logWriter.println(obj);
+ }
+ }
+
+ private void printlnError(Object obj) {
+ if (errorLogWriter != null) {
+ errorLogWriter.println(obj);
+ }
+ }
+
+ private void flush() {
+ if (logWriter != null) {
+ logWriter.flush();
+ }
+ if (errorLogWriter != null) {
+ errorLogWriter.flush();
+ }
+ }
+}
diff --git
a/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
new file mode 100644
index 00000000000..e3af43d58c6
--- /dev/null
+++
b/sdks/java/io/iceberg/hive/src/test/java/org/apache/beam/sdk/io/iceberg/hive/testutils/TestHiveMetastore.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.hive.testutils;
+
+import static java.nio.file.Files.createTempDirectory;
+import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute;
+import static java.nio.file.attribute.PosixFilePermissions.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
+import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.hive.HiveClientPool;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * A Hive Metastore implementation for local testing. Not meant to be used
directly. Use {@link
+ * HiveMetastoreExtension} instead.
+ *
+ * <p>Copied over from <a
+ *
href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java">Iceberg's
+ * integration testing util</a>
+ */
+public class TestHiveMetastore {
+
+ private static final String DEFAULT_DATABASE_NAME = "default";
+ private static final int DEFAULT_POOL_SIZE = 5;
+
+ // create the metastore handlers based on whether we're working with Hive2
or Hive3 dependencies
+ // we need to do this because there is a breaking API change between Hive2
and Hive3
+ private static final DynConstructors.Ctor<HiveMetaStore.HMSHandler>
HMS_HANDLER_CTOR =
+ DynConstructors.builder()
+ .impl(HiveMetaStore.HMSHandler.class, String.class,
Configuration.class)
+ .impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class)
+ .build();
+
+ private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER =
+ DynMethods.builder("getProxy")
+ .impl(RetryingHMSHandler.class, Configuration.class,
IHMSHandler.class, boolean.class)
+ .impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class,
boolean.class)
+ .buildStatic();
+
+ // Hive3 introduces background metastore tasks (MetastoreTaskThread) for
performing various
+ // cleanup duties. These
+ // threads are scheduled and executed in a static thread pool
+ // (org.apache.hadoop.hive.metastore.ThreadPool).
+ // This thread pool is shut down normally as part of the JVM shutdown hook,
but since we're
+ // creating and tearing down
+ // multiple metastore instances within the same JVM, we have to call this
cleanup method manually,
+ // otherwise
+ // threads from our previous test suite will be stuck in the pool with stale
config, and keep on
+ // being scheduled.
+ // This can lead to issues, e.g. accidental Persistence Manager closure by
+ // ScheduledQueryExecutionsMaintTask.
+ private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN =
+ DynMethods.builder("shutdown")
+ .impl("org.apache.hadoop.hive.metastore.ThreadPool")
+ .orNoop()
+ .buildStatic();
+
+ // It's tricky to clear all static fields in an HMS instance in order to
switch derby root dir.
+ // Therefore, we reuse the same derby root between tests and remove it after
JVM exits.
+ private static final File HIVE_LOCAL_DIR;
+ private static final String DERBY_PATH;
+
+ static {
+ try {
+ HIVE_LOCAL_DIR =
+ createTempDirectory("hive",
asFileAttribute(fromString("rwxrwxrwx"))).toFile();
+ DERBY_PATH = HIVE_LOCAL_DIR + "/metastore_db";
+ File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
+ System.setProperty("derby.stream.error.file",
derbyLogFile.getAbsolutePath());
+ setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ Path localDirPath = new
Path(HIVE_LOCAL_DIR.getAbsolutePath());
+ FileSystem fs = Util.getFs(localDirPath, new
Configuration());
+ String errMsg = "Failed to delete " + localDirPath;
+ try {
+ assertThat(fs.delete(localDirPath,
true)).as(errMsg).isTrue();
+ } catch (IOException e) {
+ throw new RuntimeException(errMsg, e);
+ }
+ }));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to setup local dir for hive
metastore", e);
+ }
+ }
+
+ private HiveConf hiveConf;
+ private ExecutorService executorService;
+ private TServer server;
+ private HiveMetaStore.HMSHandler baseHandler;
+ private HiveClientPool clientPool;
+ private final String hiveWarehousePath;
+
+ TestHiveMetastore(String hiveWarehousePath) {
+ this.hiveWarehousePath = hiveWarehousePath;
+ }
+
+ /**
+ * Starts a TestHiveMetastore with the default connection pool size (5) with
the provided
+ * HiveConf.
+ *
+ * @param conf The hive configuration to use
+ */
+ public void start(HiveConf conf) {
+ start(conf, DEFAULT_POOL_SIZE);
+ }
+
+ /**
+ * Starts a TestHiveMetastore with a provided connection pool size and
HiveConf.
+ *
+ * @param conf The hive configuration to use
+ * @param poolSize The number of threads in the executor pool
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void start(HiveConf conf, int poolSize) {
+ try {
+ TServerSocket socket = new TServerSocket(0);
+ int port = socket.getServerSocket().getLocalPort();
+ initConf(conf, port);
+
+ this.hiveConf = conf;
+ this.server = newThriftServer(socket, poolSize, hiveConf);
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.executorService.submit(() -> server.serve());
+ this.clientPool = new HiveClientPool(1, hiveConf);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot start TestHiveMetastore", e);
+ }
+ }
+
+ public void stop() throws Exception {
+ reset();
+ if (clientPool != null) {
+ clientPool.close();
+ }
+ if (server != null) {
+ server.stop();
+ }
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ if (baseHandler != null) {
+ baseHandler.shutdown();
+ }
+ METASTORE_THREADS_SHUTDOWN.invoke();
+ }
+
+ public HiveConf hiveConf() {
+ return hiveConf;
+ }
+
+ public String getDatabasePath(String dbName) {
+ return hiveWarehousePath + "/" + dbName + ".db";
+ }
+
+ public void reset() throws Exception {
+ if (clientPool != null) {
+ for (String dbName : clientPool.run(client -> client.getAllDatabases()))
{
+ for (String tblName : clientPool.run(client ->
client.getAllTables(dbName))) {
+ clientPool.run(
+ client -> {
+ client.dropTable(dbName, tblName, true, true, true);
+ return null;
+ });
+ }
+
+ if (!DEFAULT_DATABASE_NAME.equals(dbName)) {
+ // Drop cascade, functions dropped by cascade
+ clientPool.run(
+ client -> {
+ client.dropDatabase(dbName, true, true, true);
+ return null;
+ });
+ }
+ }
+ }
+
+ Path warehouseRoot = new Path(hiveWarehousePath);
+ FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
+ for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
+ if (!fileStatus.getPath().getName().equals("derby.log")
+ && !fileStatus.getPath().getName().equals("metastore_db")) {
+ fs.delete(fileStatus.getPath(), true);
+ }
+ }
+ }
+
+ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf
conf)
+ throws Exception {
+ HiveConf serverConf = new HiveConf(conf);
+ serverConf.set(
+ HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
+ "jdbc:derby:" + DERBY_PATH + ";create=true");
+ baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver",
serverConf);
+ IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler,
false);
+
+ TThreadPoolServer.Args args =
+ new TThreadPoolServer.Args(socket)
+ .processor(new TSetIpAddressProcessor<>(handler))
+ .transportFactory(new TTransportFactory())
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .minWorkerThreads(poolSize)
+ .maxWorkerThreads(poolSize);
+
+ return new TThreadPoolServer(args);
+ }
+
+ private void initConf(HiveConf conf, int port) {
+ conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" +
port);
+ conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, hiveWarehousePath);
+ conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
+
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname,
"false");
+ conf.set("iceberg.hive.client-pool-size", "2");
+ // Setting this to avoid thrift exception during running Iceberg tests
outside Iceberg.
+ conf.set(
+ HiveConf.ConfVars.HIVE_IN_TEST.varname,
HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
+ }
+
+ private static void setupMetastoreDB(String dbURL) throws SQLException,
IOException {
+ Connection connection = DriverManager.getConnection(dbURL);
+ ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
+
+ ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+ InputStream inputStream =
classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
+ try (Reader reader = new InputStreamReader(inputStream,
StandardCharsets.UTF_8)) {
+ scriptRunner.runScript(reader);
+ }
+ }
+}
diff --git
a/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
b/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
new file mode 100644
index 00000000000..808c6058576
--- /dev/null
+++ b/sdks/java/io/iceberg/hive/src/test/resources/hive-schema-3.1.0.derby.sql
@@ -0,0 +1,267 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+-- This file was copied from Apache Hive, at:
+--
https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-3.1.0.derby.sql
+--
+-- This has been modified slightly for compatibility with older Hive versions.
+--
+-- Timestamp: 2011-09-22 15:32:02.024
+-- Source database is:
/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Connection URL is:
jdbc:derby:/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb
+-- Specified schema is: APP
+-- appendLogs: false
+
+-- ----------------------------------------------
+-- DDL Statements for functions
+-- ----------------------------------------------
+
+CREATE FUNCTION "APP"."NUCLEUS_ASCII" (C CHAR(1)) RETURNS INTEGER LANGUAGE
JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME
'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.ascii' ;
+
+CREATE FUNCTION "APP"."NUCLEUS_MATCHES" (TEXT VARCHAR(8000),PATTERN
VARCHAR(8000)) RETURNS INTEGER LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL
DATA CALLED ON NULL INPUT EXTERNAL NAME
'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.matches' ;
+
+-- ----------------------------------------------
+-- DDL Statements for tables
+-- ----------------------------------------------
+CREATE TABLE "APP"."DBS" (
+ "DB_ID" BIGINT NOT NULL,
+ "DESC" VARCHAR(4000),
+ "DB_LOCATION_URI" VARCHAR(4000) NOT NULL,
+ "NAME" VARCHAR(128),
+ "OWNER_NAME" VARCHAR(128),
+ "OWNER_TYPE" VARCHAR(10),
+ "CTLG_NAME" VARCHAR(256)
+);
+
+CREATE TABLE "APP"."DATABASE_PARAMS" ("DB_ID" BIGINT NOT NULL, "PARAM_KEY"
VARCHAR(180) NOT NULL, "PARAM_VALUE" VARCHAR(4000));
+
+CREATE TABLE "APP"."SERDE_PARAMS" ("SERDE_ID" BIGINT NOT NULL, "PARAM_KEY"
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."COLUMNS_V2" ("CD_ID" BIGINT NOT NULL, "COMMENT"
VARCHAR(4000), "COLUMN_NAME" VARCHAR(767) NOT NULL, "TYPE_NAME" CLOB,
"INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SORT_COLS" ("SD_ID" BIGINT NOT NULL, "COLUMN_NAME"
VARCHAR(767), "ORDER" INTEGER NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."CDS" ("CD_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SERDES" ("SERDE_ID" BIGINT NOT NULL, "NAME" VARCHAR(128),
"SLIB" VARCHAR(4000), "DESCRIPTION" VARCHAR(4000), "SERIALIZER_CLASS"
VARCHAR(4000), "DESERIALIZER_CLASS" VARCHAR(4000), SERDE_TYPE INTEGER);
+
+CREATE TABLE "APP"."ROLE_MAP" ("ROLE_GRANT_ID" BIGINT NOT NULL, "ADD_TIME"
INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR" VARCHAR(128),
"GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128), "PRINCIPAL_TYPE"
VARCHAR(128), "ROLE_ID" BIGINT);
+
+CREATE TABLE "APP"."GLOBAL_PRIVS" ("USER_GRANT_ID" BIGINT NOT NULL,
"CREATE_TIME" INTEGER NOT NULL, "GRANT_OPTION" SMALLINT NOT NULL, "GRANTOR"
VARCHAR(128), "GRANTOR_TYPE" VARCHAR(128), "PRINCIPAL_NAME" VARCHAR(128),
"PRINCIPAL_TYPE" VARCHAR(128), "USER_PRIV" VARCHAR(128), "AUTHORIZER"
VARCHAR(128));
+
+CREATE TABLE "APP"."ROLES" ("ROLE_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER
NOT NULL, "OWNER_NAME" VARCHAR(128), "ROLE_NAME" VARCHAR(128));
+
+CREATE TABLE "APP"."TBLS" ("TBL_ID" BIGINT NOT NULL, "CREATE_TIME" INTEGER NOT
NULL, "DB_ID" BIGINT, "LAST_ACCESS_TIME" INTEGER NOT NULL, "OWNER"
VARCHAR(767), "OWNER_TYPE" VARCHAR(10), "RETENTION" INTEGER NOT NULL, "SD_ID"
BIGINT, "TBL_NAME" VARCHAR(256), "TBL_TYPE" VARCHAR(128), "VIEW_EXPANDED_TEXT"
LONG VARCHAR, "VIEW_ORIGINAL_TEXT" LONG VARCHAR, "IS_REWRITE_ENABLED" CHAR(1)
NOT NULL DEFAULT 'N');
+
+CREATE TABLE "APP"."PARTITION_KEYS" ("TBL_ID" BIGINT NOT NULL, "PKEY_COMMENT"
VARCHAR(4000), "PKEY_NAME" VARCHAR(128) NOT NULL, "PKEY_TYPE" VARCHAR(767) NOT
NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, "INPUT_FORMAT"
VARCHAR(4000), "IS_COMPRESSED" CHAR(1) NOT NULL, "LOCATION" VARCHAR(4000),
"NUM_BUCKETS" INTEGER NOT NULL, "OUTPUT_FORMAT" VARCHAR(4000), "SERDE_ID"
BIGINT, "CD_ID" BIGINT, "IS_STOREDASSUBDIRECTORIES" CHAR(1) NOT NULL);
+
+CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL,
"NEXT_VAL" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."TABLE_PARAMS" ("TBL_ID" BIGINT NOT NULL, "PARAM_KEY"
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL,
"BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SD_PARAMS" ("SD_ID" BIGINT NOT NULL, "PARAM_KEY"
VARCHAR(256) NOT NULL, "PARAM_VALUE" CLOB);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID" BIGINT NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_STRING_LIST_VALUES" ("STRING_LIST_ID" BIGINT NOT
NULL, "STRING_LIST_VALUE" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_NAMES" ("SD_ID" BIGINT NOT NULL,
"SKEWED_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ("SD_ID" BIGINT NOT NULL,
"STRING_LIST_ID_KID" BIGINT NOT NULL, "LOCATION" VARCHAR(4000));
+
+CREATE TABLE "APP"."SKEWED_VALUES" ("SD_ID_OID" BIGINT NOT NULL,
"STRING_LIST_ID_EID" BIGINT NOT NULL, "INTEGER_IDX" INTEGER NOT NULL);
+
+CREATE TABLE "APP"."VERSION" ("VER_ID" BIGINT NOT NULL, "SCHEMA_VERSION"
VARCHAR(127) NOT NULL, "VERSION_COMMENT" VARCHAR(255));
+
+CREATE TABLE "APP"."CTLGS" (
+ "CTLG_ID" BIGINT NOT NULL,
+ "NAME" VARCHAR(256) UNIQUE,
+ "DESC" VARCHAR(4000),
+ "LOCATION_URI" VARCHAR(4000) NOT NULL);
+
+-- ----------------------------------------------
+-- DML Statements
+-- ----------------------------------------------
+
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM
(VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1))
tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE"
WHERE "SEQUENCE_NAME" =
'org.apache.hadoop.hive.metastore.model.MNotificationLog');
+
+-- ----------------------------------------------
+-- DDL Statements for indexes
+-- ----------------------------------------------
+
+
+CREATE UNIQUE INDEX "APP"."ROLEENTITYINDEX" ON "APP"."ROLES" ("ROLE_NAME");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_DATABASE" ON "APP"."DBS" ("NAME",
"CTLG_NAME");
+
+CREATE UNIQUE INDEX "APP"."USERROLEMAPINDEX" ON "APP"."ROLE_MAP"
("PRINCIPAL_NAME", "ROLE_ID", "GRANTOR", "GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."GLOBALPRIVILEGEINDEX" ON "APP"."GLOBAL_PRIVS"
("AUTHORIZER", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "USER_PRIV", "GRANTOR",
"GRANTOR_TYPE");
+
+CREATE UNIQUE INDEX "APP"."UNIQUE_CATALOG" ON "APP"."CTLGS" ("NAME");
+
+
+-- ----------------------------------------------
+-- DDL Statements for keys
+-- ----------------------------------------------
+
+-- primary/unique
+ALTER TABLE "APP"."CDS" ADD CONSTRAINT "SQL110922153006460" PRIMARY KEY
("CD_ID");
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEY_PK" PRIMARY
KEY ("TBL_ID", "PKEY_NAME");
+
+ALTER TABLE "APP"."SEQUENCE_TABLE" ADD CONSTRAINT "SEQUENCE_TABLE_PK" PRIMARY
KEY ("SEQUENCE_NAME");
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_PK" PRIMARY KEY ("SD_ID");
+
+ALTER TABLE "APP"."SERDES" ADD CONSTRAINT "SERDES_PK" PRIMARY KEY ("SERDE_ID");
+
+ALTER TABLE "APP"."ROLES" ADD CONSTRAINT "ROLES_PK" PRIMARY KEY ("ROLE_ID");
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_PK" PRIMARY KEY
("SERDE_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_PK" PRIMARY KEY ("TBL_ID");
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_PK" PRIMARY KEY
("SD_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_PK"
PRIMARY KEY ("DB_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_PK" PRIMARY KEY ("DB_ID");
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_PK" PRIMARY KEY
("ROLE_GRANT_ID");
+
+ALTER TABLE "APP"."GLOBAL_PRIVS" ADD CONSTRAINT "GLOBAL_PRIVS_PK" PRIMARY KEY
("USER_GRANT_ID");
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_PK" PRIMARY
KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_PK" PRIMARY KEY
("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "SQL110922153006740" PRIMARY KEY
("CD_ID", "COLUMN_NAME");
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_PK" PRIMARY KEY
("TBL_ID", "PARAM_KEY");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST" ADD CONSTRAINT "SKEWED_STRING_LIST_PK"
PRIMARY KEY ("STRING_LIST_ID");
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT
"SKEWED_STRING_LIST_VALUES_PK" PRIMARY KEY ("STRING_LIST_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_PK"
PRIMARY KEY ("SD_ID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT
"SKEWED_COL_VALUE_LOC_MAP_PK" PRIMARY KEY ("SD_ID", "STRING_LIST_ID_KID");
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_PK" PRIMARY
KEY ("SD_ID_OID", "INTEGER_IDX");
+
+ALTER TABLE "APP"."CTLGS" ADD CONSTRAINT "CTLG_PK" PRIMARY KEY ("CTLG_ID");
+
+-- foreign
+
+ALTER TABLE "APP"."PARTITION_KEYS" ADD CONSTRAINT "PARTITION_KEYS_FK1" FOREIGN
KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE
NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK1" FOREIGN KEY ("SERDE_ID")
REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SDS_FK2" FOREIGN KEY ("CD_ID")
REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SERDE_PARAMS" ADD CONSTRAINT "SERDE_PARAMS_FK1" FOREIGN KEY
("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK2" FOREIGN KEY ("SD_ID")
REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."TBLS" ADD CONSTRAINT "TBLS_FK1" FOREIGN KEY ("DB_ID")
REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_FK1" FOREIGN KEY ("CTLG_NAME")
REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SD_PARAMS" ADD CONSTRAINT "SD_PARAMS_FK1" FOREIGN KEY
("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."DATABASE_PARAMS" ADD CONSTRAINT "DATABASE_PARAMS_FK1"
FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."ROLE_MAP" ADD CONSTRAINT "ROLE_MAP_FK1" FOREIGN KEY
("ROLE_ID") REFERENCES "APP"."ROLES" ("ROLE_ID") ON DELETE NO ACTION ON UPDATE
NO ACTION;
+
+ALTER TABLE "APP"."BUCKETING_COLS" ADD CONSTRAINT "BUCKETING_COLS_FK1" FOREIGN
KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."SORT_COLS" ADD CONSTRAINT "SORT_COLS_FK1" FOREIGN KEY
("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."COLUMNS_V2" ADD CONSTRAINT "COLUMNS_V2_FK1" FOREIGN KEY
("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."TABLE_PARAMS" ADD CONSTRAINT "TABLE_PARAMS_FK1" FOREIGN KEY
("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" ADD CONSTRAINT
"SKEWED_STRING_LIST_VALUES_FK1" FOREIGN KEY ("STRING_LIST_ID") REFERENCES
"APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_NAMES" ADD CONSTRAINT "SKEWED_COL_NAMES_FK1"
FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT
"SKEWED_COL_VALUE_LOC_MAP_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS"
("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" ADD CONSTRAINT
"SKEWED_COL_VALUE_LOC_MAP_FK2" FOREIGN KEY ("STRING_LIST_ID_KID") REFERENCES
"APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO
ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK1" FOREIGN
KEY ("SD_ID_OID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON
UPDATE NO ACTION;
+
+ALTER TABLE "APP"."SKEWED_VALUES" ADD CONSTRAINT "SKEWED_VALUES_FK2" FOREIGN
KEY ("STRING_LIST_ID_EID") REFERENCES "APP"."SKEWED_STRING_LIST"
("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+ALTER TABLE "APP"."VERSION" ADD CONSTRAINT "VERSION_PK" PRIMARY KEY ("VER_ID");
+
+ALTER TABLE "APP"."DBS" ADD CONSTRAINT "DBS_CTLG_FK" FOREIGN KEY ("CTLG_NAME")
REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION;
+
+-- ----------------------------------------------
+-- DDL Statements for checks
+-- ----------------------------------------------
+
+ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK
(IS_COMPRESSED IN ('Y','N'));
+
+-- ----------------------------
+-- Transaction and Lock Tables
+-- ----------------------------
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID bigint NOT NULL,
+ HL_LOCK_INT_ID bigint NOT NULL,
+ HL_TXNID bigint NOT NULL,
+ HL_DB varchar(128) NOT NULL,
+ HL_TABLE varchar(128),
+ HL_PARTITION varchar(767),
+ HL_LOCK_STATE char(1) NOT NULL,
+ HL_LOCK_TYPE char(1) NOT NULL,
+ HL_LAST_HEARTBEAT bigint NOT NULL,
+ HL_ACQUIRED_AT bigint,
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT integer,
+ HL_AGENT_INFO varchar(128),
+ HL_BLOCKEDBY_EXT_ID bigint,
+ HL_BLOCKEDBY_INT_ID bigint,
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar(128) NOT NULL,
+ MT_KEY2 bigint NOT NULL,
+ MT_COMMENT varchar(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare
such PK
+--This is a good candidate for Index orgainzed table
+
+-- -----------------------------------------------------------------
+-- Record schema version. Should be the last step in the init script
+-- -----------------------------------------------------------------
+INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) VALUES
(1, '3.1.0', 'Hive release version 3.1.0');
\ No newline at end of file
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
index 2956d75a266..5307047354b 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java
@@ -19,19 +19,27 @@ package org.apache.beam.sdk.io.iceberg;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
-import java.util.Properties;
+import java.util.Map;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
@Pure
+ @Nullable
public abstract String getCatalogName();
@Pure
- public abstract Properties getProperties();
+ @Nullable
+ public abstract Map<String, String> getCatalogProperties();
+
+ @Pure
+ @Nullable
+ public abstract Map<String, String> getConfigProperties();
@Pure
public static Builder builder() {
@@ -39,15 +47,32 @@ public abstract class IcebergCatalogConfig implements
Serializable {
}
public org.apache.iceberg.catalog.Catalog catalog() {
- return CatalogUtil.buildIcebergCatalog(
- getCatalogName(), Maps.fromProperties(getProperties()), new
Configuration());
+ String catalogName = getCatalogName();
+ if (catalogName == null) {
+ catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
+ }
+ Map<String, String> catalogProps = getCatalogProperties();
+ if (catalogProps == null) {
+ catalogProps = Maps.newHashMap();
+ }
+ Map<String, String> confProps = getConfigProperties();
+ if (confProps == null) {
+ confProps = Maps.newHashMap();
+ }
+ Configuration config = new Configuration();
+ for (Map.Entry<String, String> prop : confProps.entrySet()) {
+ config.set(prop.getKey(), prop.getValue());
+ }
+ return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
}
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setCatalogName(String catalogName);
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ public abstract Builder setCatalogProperties(@Nullable Map<String, String>
props);
- public abstract Builder setProperties(Properties props);
+ public abstract Builder setConfigProperties(@Nullable Map<String, String>
props);
public abstract IcebergCatalogConfig build();
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index ef535353efd..df7bda4560d 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -18,18 +18,11 @@
package org.apache.beam.sdk.io.iceberg;
import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import
org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -44,11 +37,12 @@ import org.apache.iceberg.catalog.TableIdentifier;
* org.apache.beam.sdk.values.Row}s.
*/
@AutoService(SchemaTransformProvider.class)
-public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProvider<Config> {
+public class IcebergReadSchemaTransformProvider
+ extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
static final String OUTPUT_TAG = "output";
@Override
- protected SchemaTransform from(Config configuration) {
+ protected SchemaTransform from(SchemaTransformConfiguration configuration) {
return new IcebergReadSchemaTransform(configuration);
}
@@ -62,38 +56,10 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
return ManagedTransformConstants.ICEBERG_READ;
}
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class Config {
- public static Builder builder() {
- return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
- }
-
- @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
- public abstract String getTable();
-
- @SchemaFieldDescription("Name of the catalog containing the table.")
- public abstract String getCatalogName();
-
- @SchemaFieldDescription("Configuration properties used to set up the
Iceberg catalog.")
- public abstract Map<String, String> getCatalogProperties();
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setTable(String table);
-
- public abstract Builder setCatalogName(String catalogName);
-
- public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
-
- public abstract Config build();
- }
- }
-
static class IcebergReadSchemaTransform extends SchemaTransform {
- private final Config configuration;
+ private final SchemaTransformConfiguration configuration;
- IcebergReadSchemaTransform(Config configuration) {
+ IcebergReadSchemaTransform(SchemaTransformConfiguration configuration) {
this.configuration = configuration;
}
@@ -102,7 +68,7 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
// To stay consistent with our SchemaTransform configuration naming
conventions,
// we sort lexicographically and convert field names to snake_case
return SchemaRegistry.createDefault()
- .getToRowFunction(Config.class)
+ .getToRowFunction(SchemaTransformConfiguration.class)
.apply(configuration)
.sorted()
.toSnakeCase();
@@ -113,19 +79,11 @@ public class IcebergReadSchemaTransformProvider extends
TypedSchemaTransformProv
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- Properties properties = new Properties();
- properties.putAll(configuration.getCatalogProperties());
-
- IcebergCatalogConfig.Builder catalogBuilder =
- IcebergCatalogConfig.builder()
- .setCatalogName(configuration.getCatalogName())
- .setProperties(properties);
-
PCollection<Row> output =
input
.getPipeline()
.apply(
- IcebergIO.readRows(catalogBuilder.build())
+ IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable())));
return PCollectionRowTuple.of(OUTPUT_TAG, output);
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b3de7a88c54..3f0f88946d9 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -18,19 +18,12 @@
package org.apache.beam.sdk.io.iceberg;
import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -48,7 +41,8 @@ import org.apache.iceberg.catalog.TableIdentifier;
* outputs a {@code PCollection<Row>} representing snapshots created in the
process.
*/
@AutoService(SchemaTransformProvider.class)
-public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformProvider<Config> {
+public class IcebergWriteSchemaTransformProvider
+ extends TypedSchemaTransformProvider<SchemaTransformConfiguration> {
static final String INPUT_TAG = "input";
static final String OUTPUT_TAG = "output";
@@ -64,7 +58,7 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
}
@Override
- protected SchemaTransform from(Config configuration) {
+ protected SchemaTransform from(SchemaTransformConfiguration configuration) {
return new IcebergWriteSchemaTransform(configuration);
}
@@ -83,38 +77,10 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
return ManagedTransformConstants.ICEBERG_WRITE;
}
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class Config {
- public static Builder builder() {
- return new
AutoValue_IcebergWriteSchemaTransformProvider_Config.Builder();
- }
-
- @SchemaFieldDescription("Identifier of the Iceberg table to write to.")
- public abstract String getTable();
-
- @SchemaFieldDescription("Name of the catalog containing the table.")
- public abstract String getCatalogName();
-
- @SchemaFieldDescription("Configuration properties used to set up the
Iceberg catalog.")
- public abstract Map<String, String> getCatalogProperties();
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setTable(String table);
-
- public abstract Builder setCatalogName(String catalogName);
-
- public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
-
- public abstract Config build();
- }
- }
-
static class IcebergWriteSchemaTransform extends SchemaTransform {
- private final Config configuration;
+ private final SchemaTransformConfiguration configuration;
- IcebergWriteSchemaTransform(Config configuration) {
+ IcebergWriteSchemaTransform(SchemaTransformConfiguration configuration) {
this.configuration = configuration;
}
@@ -123,7 +89,7 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
// To stay consistent with our SchemaTransform configuration naming
conventions,
// we sort lexicographically and convert field names to snake_case
return SchemaRegistry.createDefault()
- .getToRowFunction(Config.class)
+ .getToRowFunction(SchemaTransformConfiguration.class)
.apply(configuration)
.sorted()
.toSnakeCase();
@@ -136,19 +102,11 @@ public class IcebergWriteSchemaTransformProvider extends
TypedSchemaTransformPro
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rows = input.get(INPUT_TAG);
- Properties properties = new Properties();
- properties.putAll(configuration.getCatalogProperties());
-
- IcebergCatalogConfig catalog =
- IcebergCatalogConfig.builder()
- .setCatalogName(configuration.getCatalogName())
- .setProperties(properties)
- .build();
-
// TODO: support dynamic destinations
IcebergWriteResult result =
rows.apply(
-
IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable())));
+ IcebergIO.writeRows(configuration.getIcebergCatalog())
+ .to(TableIdentifier.parse(configuration.getTable())));
PCollection<Row> snapshots =
result
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
new file mode 100644
index 00000000000..6e7a12aa15a
--- /dev/null
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaTransformConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SchemaTransformConfiguration {
+ public static Builder builder() {
+ return new AutoValue_SchemaTransformConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription("Identifier of the Iceberg table.")
+ public abstract String getTable();
+
+ @SchemaFieldDescription("Name of the catalog containing the table.")
+ @Nullable
+ public abstract String getCatalogName();
+
+ @SchemaFieldDescription("Properties used to set up the Iceberg catalog.")
+ @Nullable
+ public abstract Map<String, String> getCatalogProperties();
+
+ @SchemaFieldDescription("Properties passed to the Hadoop Configuration.")
+ @Nullable
+ public abstract Map<String, String> getConfigProperties();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setTable(String table);
+
+ public abstract Builder setCatalogName(String catalogName);
+
+ public abstract Builder setCatalogProperties(Map<String, String>
catalogProperties);
+
+ public abstract Builder setConfigProperties(Map<String, String>
confProperties);
+
+ public abstract SchemaTransformConfiguration build();
+ }
+
+ public IcebergCatalogConfig getIcebergCatalog() {
+ return IcebergCatalogConfig.builder()
+ .setCatalogName(getCatalogName())
+ .setCatalogProperties(getCatalogProperties())
+ .setConfigProperties(getConfigProperties())
+ .build();
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index 3f31073b444..fe4a07dedfd 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -21,7 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -94,12 +95,17 @@ public class IcebergIOReadTest {
.map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
.collect(Collectors.toList());
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
IcebergCatalogConfig catalogConfig =
-
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+ IcebergCatalogConfig.builder()
+ .setCatalogName("name")
+ .setCatalogProperties(catalogProps)
+ .build();
PCollection<Row> output =
testPipeline
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index 02213c45e07..2abe6b09348 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -23,7 +23,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -76,12 +75,17 @@ public class IcebergIOWriteTest implements Serializable {
// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
IcebergCatalogConfig catalog =
-
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+ IcebergCatalogConfig.builder()
+ .setCatalogName("name")
+ .setCatalogProperties(catalogProps)
+ .build();
testPipeline
.apply("Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
@@ -110,12 +114,17 @@ public class IcebergIOWriteTest implements Serializable {
Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
IcebergCatalogConfig catalog =
-
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+ IcebergCatalogConfig.builder()
+ .setCatalogName("name")
+ .setCatalogProperties(catalogProps)
+ .build();
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
@@ -200,12 +209,17 @@ public class IcebergIOWriteTest implements Serializable {
elementsPerTable.computeIfAbsent(tableId, ignored ->
Lists.newArrayList()).add(element);
}
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
IcebergCatalogConfig catalog =
-
IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build();
+ IcebergCatalogConfig.builder()
+ .setCatalogName("name")
+ .setCatalogProperties(catalogProps)
+ .build();
DynamicDestinations dynamicDestinations =
new DynamicDestinations() {
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index effb5cc4838..0311c31da40 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -101,8 +101,8 @@ public class IcebergReadSchemaTransformProviderTest {
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
- IcebergReadSchemaTransformProvider.Config readConfig =
- IcebergReadSchemaTransformProvider.Config.builder()
+ SchemaTransformConfiguration readConfig =
+ SchemaTransformConfiguration.builder()
.setTable(identifier)
.setCatalogName("name")
.setCatalogProperties(properties)
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
index 7863f7812a1..86a5e0bcd43 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java
@@ -71,6 +71,8 @@ public class IcebergSchemaTransformTranslationTest {
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", "test_location")
.build();
+ private static final Map<String, String> CONFIG_PROPERTIES =
+ ImmutableMap.<String, String>builder().put("key", "value").put("key2",
"value2").build();
@Test
public void testReCreateWriteTransformFromRow() {
@@ -79,6 +81,7 @@ public class IcebergSchemaTransformTranslationTest {
.withFieldValue("table", "test_table_identifier")
.withFieldValue("catalog_name", "test-name")
.withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+ .withFieldValue("config_properties", CONFIG_PROPERTIES)
.build();
IcebergWriteSchemaTransform writeTransform =
(IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow);
@@ -110,6 +113,7 @@ public class IcebergSchemaTransformTranslationTest {
.withFieldValue("table", "test_identifier")
.withFieldValue("catalog_name", "test-name")
.withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+ .withFieldValue("config_properties", CONFIG_PROPERTIES)
.build();
IcebergWriteSchemaTransform writeTransform =
@@ -161,6 +165,7 @@ public class IcebergSchemaTransformTranslationTest {
.withFieldValue("table", "test_table_identifier")
.withFieldValue("catalog_name", "test-name")
.withFieldValue("catalog_properties", CATALOG_PROPERTIES)
+ .withFieldValue("config_properties", CONFIG_PROPERTIES)
.build();
IcebergReadSchemaTransform readTransform =
@@ -192,6 +197,7 @@ public class IcebergSchemaTransformTranslationTest {
.withFieldValue("table", identifier)
.withFieldValue("catalog_name", "test-name")
.withFieldValue("catalog_properties", properties)
+ .withFieldValue("config_properties", CONFIG_PROPERTIES)
.build();
IcebergReadSchemaTransform readTransform =
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index a2cd64e2395..6b555e7e14d 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG;
import static
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -89,8 +88,8 @@ public class IcebergWriteSchemaTransformProviderTest {
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
- Config config =
- Config.builder()
+ SchemaTransformConfiguration config =
+ SchemaTransformConfiguration.builder()
.setTable(identifier)
.setCatalogName("name")
.setCatalogProperties(properties)
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index 007cb028c66..38a15cb2aa9 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -20,13 +20,14 @@ package org.apache.beam.sdk.io.iceberg;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
-import java.util.Properties;
+import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -65,9 +66,11 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
BoundedSource<Row> source =
new ScanSource(
@@ -75,7 +78,7 @@ public class ScanSourceTest {
.setCatalogConfig(
IcebergCatalogConfig.builder()
.setCatalogName("name")
- .setProperties(props)
+ .setCatalogProperties(catalogProps)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
@@ -107,9 +110,11 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
BoundedSource<Row> source =
new ScanSource(
@@ -117,7 +122,7 @@ public class ScanSourceTest {
.setCatalogConfig(
IcebergCatalogConfig.builder()
.setCatalogName("name")
- .setProperties(props)
+ .setCatalogProperties(catalogProps)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
@@ -153,9 +158,11 @@ public class ScanSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
- Properties props = new Properties();
- props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
- props.setProperty("warehouse", warehouse.location);
+ Map<String, String> catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+ .put("warehouse", warehouse.location)
+ .build();
BoundedSource<Row> source =
new ScanSource(
@@ -163,7 +170,7 @@ public class ScanSourceTest {
.setCatalogConfig(
IcebergCatalogConfig.builder()
.setCatalogName("name")
- .setProperties(props)
+ .setCatalogProperties(catalogProps)
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 4d4b93908a0..65a55885afa 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -365,3 +365,7 @@ include("sdks:java:io:solace")
findProject(":sdks:java:io:solace")?.name = "solace"
include("sdks:java:extensions:combiners")
findProject(":sdks:java:extensions:combiners")?.name = "combiners"
+include("sdks:java:io:iceberg:hive")
+findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
+include("sdks:java:io:iceberg:hive:exec")
+findProject(":sdks:java:io:iceberg:hive:exec")?.name = "exec"