This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d627b40898 [Iceberg] Add BQMS test that validates using a BQ query 
(#33625)
4d627b40898 is described below

commit 4d627b40898f64114aaebeffc7779db38e4a520d
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jan 24 19:29:22 2025 +0000

    [Iceberg] Add BQMS test that validates using a BQ query (#33625)
    
    * add test
    
    * spotless
    
    * another query test
---
 .../IO_Iceberg_Integration_Tests.json              |  2 +-
 sdks/java/io/iceberg/build.gradle                  |  2 +
 .../catalog/BigQueryMetastoreCatalogIT.java        | 77 +++++++++++++++++++++-
 .../sdk/io/iceberg/catalog/HadoopCatalogIT.java    |  2 +-
 .../io/iceberg/catalog/IcebergCatalogBaseIT.java   | 20 ++++--
 5 files changed, 91 insertions(+), 12 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 37dd25bf902..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 3
+    "modification": 1
 }
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index d84205791d4..8613a4b22c6 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -84,6 +84,8 @@ dependencies {
 
     // BigQueryMetastore catalog dep
     testImplementation project(path: ":sdks:java:io:iceberg:bqms", 
configuration: "shadow")
+    testImplementation project(":sdks:java:io:google-cloud-platform")
+    testImplementation library.java.google_api_services_bigquery
 
     testRuntimeOnly library.java.slf4j_jdk14
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
index 887b0cfa56c..3a8b47cb5a0 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
@@ -17,19 +17,49 @@
  */
 package org.apache.beam.sdk.io.iceberg.catalog;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.RowFilter;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("BigQueryMetastoreCatalogIT");
   static final String BQMS_CATALOG = 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
-  static final String DATASET = "managed_iceberg_bqms_tests_no_delete";
+  static final String DATASET = "managed_iceberg_bqms_tests_" + 
System.nanoTime();;
   static final long SALT = System.nanoTime();
 
+  @BeforeClass
+  public static void createDataset() throws IOException, InterruptedException {
+    BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
+  }
+
+  @AfterClass
+  public static void deleteDataset() {
+    BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
+  }
+
   @Override
   public String tableId() {
     return DATASET + "." + testName.getMethodName() + "_" + SALT;
@@ -41,7 +71,7 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
         BQMS_CATALOG,
         "bqms_" + catalogName,
         ImmutableMap.<String, String>builder()
-            .put("gcp_project", options.getProject())
+            .put("gcp_project", OPTIONS.getProject())
             .put("gcp_location", "us-central1")
             .put("warehouse", warehouse)
             .build(),
@@ -65,7 +95,7 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
         .put(
             "catalog_properties",
             ImmutableMap.<String, String>builder()
-                .put("gcp_project", options.getProject())
+                .put("gcp_project", OPTIONS.getProject())
                 .put("gcp_location", "us-central1")
                 .put("warehouse", warehouse)
                 .put("catalog-impl", BQMS_CATALOG)
@@ -73,4 +103,45 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
                 .build())
         .build();
   }
+
+  @Test
+  public void testWriteToPartitionedAndValidateWithBQQuery()
+      throws IOException, InterruptedException {
+    // For an example row where bool=true, modulo_5=3, str=value_303,
+    // this partition spec will create a partition like: 
/bool=true/modulo_5=3/str_trunc=value_3/
+    PartitionSpec partitionSpec =
+        PartitionSpec.builderFor(ICEBERG_SCHEMA)
+            .identity("bool")
+            .hour("datetime")
+            .truncate("str", "value_x".length())
+            .build();
+    catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, 
partitionSpec);
+
+    // Write with Beam
+    Map<String, Object> config = managedIcebergConfig(tableId());
+    PCollection<Row> input = 
pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
+    input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
+    pipeline.run().waitUntilFinish();
+
+    // Fetch records using a BigQuery query and validate
+    BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
+    String query = String.format("SELECT * FROM `%s.%s`", 
OPTIONS.getProject(), tableId());
+    List<TableRow> rows = bqClient.queryUnflattened(query, 
OPTIONS.getProject(), true, true);
+    List<Row> beamRows =
+        rows.stream()
+            .map(tr -> BigQueryUtils.toBeamRow(BEAM_SCHEMA, tr))
+            .collect(Collectors.toList());
+
+    assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));
+
+    String queryByPartition =
+        String.format("SELECT bool, datetime FROM `%s.%s`", 
OPTIONS.getProject(), tableId());
+    rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), 
true, true);
+    RowFilter rowFilter = new 
RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool", "datetime"));
+    beamRows =
+        rows.stream()
+            .map(tr -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tr))
+            .collect(Collectors.toList());
+    assertThat(beamRows, 
containsInAnyOrder(inputRows.stream().map(rowFilter::filter).toArray()));
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
index 4e6766cc496..dc5e3b26324 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java
@@ -37,7 +37,7 @@ public class HadoopCatalogIT extends IcebergCatalogBaseIT {
   @Override
   public Catalog createCatalog() {
     Configuration catalogHadoopConf = new Configuration();
-    catalogHadoopConf.set("fs.gs.project.id", options.getProject());
+    catalogHadoopConf.set("fs.gs.project.id", OPTIONS.getProject());
     catalogHadoopConf.set("fs.gs.auth.type", "APPLICATION_DEFAULT");
 
     HadoopCatalog catalog = new HadoopCatalog();
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index f11c77ccc68..518470138e9 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -147,7 +147,12 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
 
   @Before
   public void setUp() throws Exception {
-    options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
+    warehouse =
+        String.format(
+            "%s/%s/%s",
+            TestPipeline.testingPipelineOptions().getTempLocation(),
+            getClass().getSimpleName(),
+            RANDOM);
     warehouse = warehouse(getClass());
     catalogSetup();
     catalog = createCatalog();
@@ -162,7 +167,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     }
 
     try {
-      GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil();
+      GcsUtil gcsUtil = OPTIONS.as(GcsOptions.class).getGcsUtil();
       GcsPath path = GcsPath.fromUri(warehouse);
 
       @Nullable
@@ -190,7 +195,8 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
 
   protected static String warehouse;
   public Catalog catalog;
-  protected static GcpOptions options;
+  protected static final GcpOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class);
   private static final String RANDOM = UUID.randomUUID().toString();
   @Rule public TestPipeline pipeline = TestPipeline.create();
   @Rule public TestName testName = new TestName();
@@ -210,7 +216,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
           .addInt32Field("nested_int")
           .addFloatField("nested_float")
           .build();
-  private static final Schema BEAM_SCHEMA =
+  protected static final Schema BEAM_SCHEMA =
       Schema.builder()
           .addStringField("str")
           .addStringField("char")
@@ -262,16 +268,16 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
         }
       };
 
-  private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
+  protected static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
       IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
-  private static final SimpleFunction<Row, Record> RECORD_FUNC =
+  protected static final SimpleFunction<Row, Record> RECORD_FUNC =
       new SimpleFunction<Row, Record>() {
         @Override
         public Record apply(Row input) {
           return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
         }
       };
-  private final List<Row> inputRows =
+  protected final List<Row> inputRows =
       LongStream.range(0, 
numRecords()).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
 
   /** Populates the Iceberg table and Returns a {@link List<Row>} of expected 
elements. */

Reply via email to