This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d27dc82c568 [IcebergIO] Add ITs for RESTCatalog using BLMS (#35360)
d27dc82c568 is described below

commit d27dc82c56882e38a1df43ec90706f14811d50fb
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Mar 9 16:43:31 2026 -0400

    [IcebergIO] Add ITs for RESTCatalog using BLMS (#35360)
    
    * ITs for RESTCatalog using BLMS
    
    * update rest catalog config
    
    * use top-level gcs bucket for warehouse
---
 .../IO_Iceberg_Integration_Tests.json              |  2 +-
 sdks/java/io/iceberg/build.gradle                  |  1 +
 .../io/iceberg/catalog/IcebergCatalogBaseIT.java   | 28 +++++---
 .../sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java  | 75 ++++++++++++++++++++++
 4 files changed, 95 insertions(+), 11 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 7ab7bcd9a9c..34a6e02150e 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 2
+    "modification": 4
 }
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index c188e4f6385..a1f352d0530 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -96,6 +96,7 @@ dependencies {
     testImplementation project(":sdks:java:io:google-cloud-platform")
     testImplementation library.java.google_api_services_bigquery
 
+    testImplementation library.java.google_auth_library_oauth2_http
     testRuntimeOnly library.java.slf4j_jdk14
     testImplementation project(path: ":runners:direct-java", configuration: 
"shadow")
     testRuntimeOnly project(path: ":runners:google-cloud-dataflow-java")
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index c2c5dc0b8f4..e81f75c40fb 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.iceberg.catalog;
 
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamSchemaToIcebergSchema;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema;
 import static org.apache.beam.sdk.managed.Managed.ICEBERG;
 import static org.apache.beam.sdk.managed.Managed.ICEBERG_CDC;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
@@ -313,7 +315,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
       };
 
   protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
-      IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+      beamSchemaToIcebergSchema(BEAM_SCHEMA);
   protected static final SimpleFunction<Row, Record> RECORD_FUNC =
       new SimpleFunction<Row, Record>() {
         @Override
@@ -346,7 +348,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
       }
       DataWriter<Record> writer =
           Parquet.writeData(file)
-              .schema(ICEBERG_SCHEMA)
+              .schema(table.schema())
               .createWriterFunc(GenericParquetWriter::create)
               .overwrite()
               .withSpec(table.spec())
@@ -652,7 +654,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     pipeline.run().waitUntilFinish();
 
     Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
-    assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));
+    assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(table.schema()));
 
     // Read back and check records are correct
     List<Record> returnedRecords = readRecords(table);
@@ -664,9 +666,9 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
   public void testWriteToPartitionedTable() throws IOException {
     Map<String, Object> config = new 
HashMap<>(managedIcebergConfig(tableId()));
     int truncLength = "value_x".length();
-    config.put(
-        "partition_fields",
-        Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + 
truncLength + ")"));
+    List<String> partitionFields =
+        Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + 
truncLength + ")");
+    config.put("partition_fields", partitionFields);
     PCollection<Row> input = 
pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
     input.apply(Managed.write(ICEBERG).withConfig(config));
     pipeline.run().waitUntilFinish();
@@ -674,6 +676,13 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     // Read back and check records are correct
     Table table = catalog.loadTable(TableIdentifier.parse(tableId()));
     List<Record> returnedRecords = readRecords(table);
+    PartitionSpec expectedSpec =
+        PartitionSpec.builderFor(table.schema())
+            .identity("bool_field")
+            .hour("datetime")
+            .truncate("str", truncLength)
+            .build();
+    assertEquals(expectedSpec, table.spec());
     assertThat(
         returnedRecords, 
containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray()));
   }
@@ -815,10 +824,8 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     Table table3 = catalog.loadTable(TableIdentifier.parse(tableId() + 
"_3_d"));
     Table table4 = catalog.loadTable(TableIdentifier.parse(tableId() + 
"_4_e"));
 
-    org.apache.iceberg.Schema tableSchema =
-        IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
     for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
-      assertTrue(t.schema().sameSchema(tableSchema));
+      assertEquals(rowFilter.outputSchema(), 
icebergSchemaToBeamSchema(t.schema()));
     }
 
     // Read back and check records are correct
@@ -830,6 +837,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
             readRecords(table3),
             readRecords(table4));
 
+    org.apache.iceberg.Schema tableSchema = 
beamSchemaToIcebergSchema(rowFilter.outputSchema());
     SerializableFunction<Row, Record> recordFunc =
         row -> IcebergUtils.beamRowToIcebergRecord(tableSchema, row);
 
@@ -936,7 +944,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
             table3false,
             table4true,
             table4false)) {
-      assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA));
+      assertEquals(BEAM_SCHEMA, icebergSchemaToBeamSchema(t.schema()));
     }
 
     // Read back and check records are correct
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
new file mode 100644
index 00000000000..c16df763333
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/RESTCatalogBLMSIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.catalog;
+
+import java.util.Map;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.junit.After;
+import org.junit.BeforeClass;
+
+/** Tests for {@link org.apache.iceberg.rest.RESTCatalog} using BigLake 
Metastore. */
+public class RESTCatalogBLMSIT extends IcebergCatalogBaseIT {
+  private static Map<String, String> catalogProps;
+
+  // Using a special bucket for this test class because
+  // BigLake does not support using subfolders as a warehouse (yet)
+  private static final String BIGLAKE_WAREHOUSE = 
"gs://managed-iceberg-biglake-its";
+
+  @BeforeClass
+  public static void setup() {
+    warehouse = BIGLAKE_WAREHOUSE;
+    catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", "rest")
+            .put("uri", 
"https://biglake.googleapis.com/iceberg/v1/restcatalog";)
+            .put("warehouse", BIGLAKE_WAREHOUSE)
+            .put("header.x-goog-user-project", OPTIONS.getProject())
+            .put("rest-metrics-reporting-enabled", "false")
+            .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
+            .put("rest.auth.type", 
"org.apache.iceberg.gcp.auth.GoogleAuthManager")
+            .build();
+  }
+
+  @After
+  public void after() {
+    // making sure the cleanup path is directed at the correct warehouse
+    warehouse = BIGLAKE_WAREHOUSE;
+  }
+
+  @Override
+  public String type() {
+    return "biglake";
+  }
+
+  @Override
+  public Catalog createCatalog() {
+    RESTCatalog restCatalog = new RESTCatalog();
+    restCatalog.initialize(catalogName, catalogProps);
+    return restCatalog;
+  }
+
+  @Override
+  public Map<String, Object> managedIcebergConfig(String tableId) {
+    return ImmutableMap.<String, Object>builder()
+        .put("table", tableId)
+        .put("catalog_properties", catalogProps)
+        .build();
+  }
+}

Reply via email to