This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d27dc82c568 [IcebergIO] Add ITs for RESTCatalog using BLMS (#35360)
d27dc82c568 is described below
commit d27dc82c56882e38a1df43ec90706f14811d50fb
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Mar 9 16:43:31 2026 -0400
[IcebergIO] Add ITs for RESTCatalog using BLMS (#35360)
* ITs for RESTCatalog using BLMS
* update rest catalog config
* use top-level gcs bucket for warehouse
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
sdks/java/io/iceberg/build.gradle | 1 +
.../io/iceberg/catalog/IcebergCatalogBaseIT.java | 28 +++++---
.../sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java | 75 ++++++++++++++++++++++
4 files changed, 95 insertions(+), 11 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 7ab7bcd9a9c..34a6e02150e 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 2
+ "modification": 4
}
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index c188e4f6385..a1f352d0530 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -96,6 +96,7 @@ dependencies {
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation library.java.google_api_services_bigquery
+ testImplementation library.java.google_auth_library_oauth2_http
testRuntimeOnly library.java.slf4j_jdk14
testImplementation project(path: ":runners:direct-java", configuration:
"shadow")
testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index c2c5dc0b8f4..e81f75c40fb 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
import static org.apache.beam.sdk.managed.Managed.ICEBERG;
import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
@@ -313,7 +315,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
};
protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
- IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+ beamSchemaToIcebergSchema(BEAM_SCHEMA);
protected static final SimpleFunction<Row, Record> RECORD_FUNC =
new SimpleFunction<Row, Record>() {
@Override
@@ -346,7 +348,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
}
DataWriter<Record> writer =
Parquet.writeData(file)
- .schema(ICEBERG_SCHEMA)
+ .schema(table.schema())
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(table.spec())
@@ -652,7 +654,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
pipeline.run().waitUntilFinish();
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
- assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));
+ assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(table.schema()));
// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
@@ -664,9 +666,9 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
public void testWriteToPartitionedTable() throws IOException {
Map<String, Object> config = new
HashMap<>(managedIcebergConfig(tableId()));
int truncLength = "value_x".length();
- config.put(
- "partition_fields",
- Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " +
truncLength + ")"));
+ List<String> partitionFields =
+ Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " +
truncLength + ")");
+ config.put("partition_fields", partitionFields);
PCollection<Row> input =
pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
@@ -674,6 +676,13 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
// Read back and check records are correct
Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
List<Record> returnedRecords = readRecords(table);
+ PartitionSpec expectedSpec =
+ PartitionSpec.builderFor(table.schema())
+ .identity("bool_field")
+ .hour("datetime")
+ .truncate("str", truncLength)
+ .build();
+ assertEquals(expectedSpec, table.spec());
assertThat(
returnedRecords,
containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
}
@@ -815,10 +824,8 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
Table table3 = catalog.loadTable(TableIdentifier.parse(tableId() +
"_3_d"));
Table table4 = catalog.loadTable(TableIdentifier.parse(tableId() +
"_4_e"));
- org.apache.iceberg.Schema tableSchema =
- IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
- assertTrue(t.schema().sameSchema(tableSchema));
+ assertEquals(rowFilter.outputSchema(),
icebergSchemaToBeamSchema(t.schema()));
}
// Read back and check records are correct
@@ -830,6 +837,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
readRecords(table3),
readRecords(table4));
+ org.apache.iceberg.Schema tableSchema =
beamSchemaToIcebergSchema(rowFilter.outputSchema());
SerializableFunction<Row, Record> recordFunc =
row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row);
@@ -936,7 +944,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
table3false,
table4true,
table4false)) {
- assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA));
+ assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(t.schema()));
}
// Read back and check records are correct
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
new file mode 100644
index 00000000000..c16df763333
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.catalog;
+
+import java.util.Map;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.junit.After;
+import org.junit.BeforeClass;
+
+/** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake
Metastore. */
+public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT {
+ private static Map<String, String> catalogProps;
+
+ // Using a special bucket for this test class because
+ // BigLake does not support using subfolders as a warehouse (yet)
+ private static final String BIGLAKE_WAREHOUSE =
"gs://managed-iceberg-biglake-its";
+
+ @BeforeClass
+ public static void setup() {
+ warehouse = BIGLAKE_WAREHOUSE;
+ catalogProps =
+ ImmutableMap.<String, String>builder()
+ .put("type", "rest")
+ .put("uri",
"https://biglake.googleapis.com/iceberg/v1/restcatalog")
+ .put("warehouse", BIGLAKE_WAREHOUSE)
+ .put("header.x-goog-user-project", OPTIONS.getProject())
+ .put("rest-metrics-reporting-enabled", "false")
+ .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
+ .put("rest.auth.type",
"org.apache.iceberg.gcp.auth.GoogleAuthManager")
+ .build();
+ }
+
+ @After
+ public void after() {
+ // making sure the cleanup path is directed at the correct warehouse
+ warehouse = BIGLAKE_WAREHOUSE;
+ }
+
+ @Override
+ public String type() {
+ return "biglake";
+ }
+
+ @Override
+ public Catalog createCatalog() {
+ RESTCatalog restCatalog = new RESTCatalog();
+ restCatalog.initialize(catalogName, catalogProps);
+ return restCatalog;
+ }
+
+ @Override
+ public Map<String, Object> managedIcebergConfig(String tableId) {
+ return ImmutableMap.<String, Object>builder()
+ .put("table", tableId)
+ .put("catalog_properties", catalogProps)
+ .build();
+ }
+}