This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 720f9617be0 Metastore example (#35722)
720f9617be0 is described below

commit 720f9617be059492f9f4318d808f4a42ac20c9a6
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Tue Aug 5 12:00:50 2025 -0700

    Metastore example (#35722)
    
    * Example for BigLake Metastore
    
    * Make dependency lean
    
    * fix comments
    
    * Fix gradle
    
    * fix gradle
    
    * fix BQ table name
---
 examples/java/build.gradle                         |   1 +
 .../cookbook/IcebergBatchWriteExample.java         | 208 +++++++++++++++++++++
 2 files changed, 209 insertions(+)

diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 5a1d5b2e8fd..0f1a1f7ef7e 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -72,6 +72,7 @@ dependencies {
   implementation project(":sdks:java:io:google-cloud-platform")
   implementation project(":sdks:java:io:kafka")
   runtimeOnly project(":sdks:java:io:iceberg")
+  runtimeOnly project(":sdks:java:io:iceberg:bqms")
   implementation project(":sdks:java:managed")
   implementation project(":sdks:java:extensions:ml")
   implementation library.java.avro
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java
new file mode 100644
index 00000000000..458f2b54545
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * This pipeline demonstrates a batch write to an Iceberg table using the 
BigQuery Metastore
+ * catalog.
+ *
+ * <p>The pipeline reads from a public BigQuery table containing Google 
Analytics session data,
+ * extracts and aggregates the total number of transactions per web browser, 
and writes the results
+ * to a new Iceberg table managed by the BigQuery Metastore.
+ *
+ * <p>This example is a demonstration of the Iceberg BigQuery Metastore. For 
more information, see
+ * the documentation at 
https://cloud.google.com/bigquery/docs/blms-use-dataproc.
+ */
+public class IcebergBatchWriteExample {
+
+  public static final Schema BQ_SCHEMA =
+      
Schema.builder().addStringField("browser").addInt64Field("transactions").build();
+
+  public static final Schema AGGREGATED_SCHEMA =
+      
Schema.builder().addStringField("browser").addInt64Field("transaction_count").build();
+
+  public static final String BQ_TABLE =
+      "bigquery-public-data.google_analytics_sample.ga_sessions_20170801";
+
+  private static Row flattenAnalyticsRow(Row row) {
+    Row device = Preconditions.checkStateNotNull(row.getRow("device"));
+    Row totals = Preconditions.checkStateNotNull(row.getRow("totals"));
+    return Row.withSchema(BQ_SCHEMA)
+        .withFieldValue("browser", 
Preconditions.checkStateNotNull(device.getString("browser")))
+        .withFieldValue(
+            "transactions", 
Preconditions.checkStateNotNull(totals.getInt64("transactions")))
+        .build();
+  }
+
+  static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, 
Long>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      c.output(
+          KV.of(
+              Preconditions.checkStateNotNull(row.getString("browser")),
+              Preconditions.checkStateNotNull(row.getInt64("transactions"))));
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, Long>, Row> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row =
+          Row.withSchema(AGGREGATED_SCHEMA)
+              .withFieldValue("browser", c.element().getKey())
+              .withFieldValue("transaction_count", c.element().getValue())
+              .build();
+      c.output(row);
+    }
+  }
+
+  static class CountTransactions extends PTransform<PCollection<Row>, 
PCollection<Row>> {
+    @Override
+    public PCollection<Row> expand(PCollection<Row> rows) {
+      PCollection<KV<String, Long>> browserTransactions =
+          rows.apply(ParDo.of(new ExtractBrowserTransactionsFn()));
+      PCollection<KV<String, Long>> browserCounts = 
browserTransactions.apply(Sum.longsPerKey());
+      return browserCounts.apply(ParDo.of(new FormatCountsFn()));
+    }
+  }
+
+  /** Pipeline options for this example. */
+  public interface IcebergPipelineOptions extends GcpOptions {
+    @Description(
+        "Warehouse location where the table's data will be written to. "
+            + "As of 07/14/25 BigLake only supports Single Region buckets")
+    @Validation.Required
+    @Default.String("gs://analytics_warehouse")
+    String getWarehouse();
+
+    void setWarehouse(String warehouse);
+
+    @Description("The Iceberg table to write to, in the format 
'dataset.table'.")
+    @Validation.Required
+    @Default.String("analytics_dataset.transactions_by_browser")
+    String getIcebergTable();
+
+    void setIcebergTable(String value);
+
+    @Description("The name of the catalog to use.")
+    @Validation.Required
+    @Default.String("analytics")
+    String getCatalogName();
+
+    void setCatalogName(String catalogName);
+
+    @Description("The implementation of the Iceberg catalog.")
+    @Default.String("org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
+    String getCatalogImpl();
+
+    void setCatalogImpl(String catalogImpl);
+
+    @Description("The GCP location for the BigQuery Metastore.")
+    @Default.String("us-central1")
+    String getGcpLocation();
+
+    void setGcpLocation(String gcpLocation);
+
+    @Description("The implementation of the Iceberg FileIO.")
+    @Default.String("org.apache.iceberg.gcp.gcs.GCSFileIO")
+    String getIoImpl();
+
+    void setIoImpl(String ioImpl);
+  }
+
+  /**
+   * Main entry point for the pipeline.
+   *
+   * @param args Command line arguments
+   * @throws IOException if there's an issue with the pipeline setup
+   */
+  public static void main(String[] args) throws IOException {
+    IcebergPipelineOptions options =
+        
PipelineOptionsFactory.fromArgs(args).withValidation().as(IcebergPipelineOptions.class);
+
+    final String tableIdentifier = options.getIcebergTable();
+    final String warehouseLocation = options.getWarehouse();
+    final String catalogName = options.getCatalogName();
+    final String projectName = options.getProject();
+    final String catalogImpl = options.getCatalogImpl();
+    final String gcpLocation = options.getGcpLocation();
+    final String ioImpl = options.getIoImpl();
+
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("warehouse", warehouseLocation)
+            .put("catalog-impl", catalogImpl)
+            .put("gcp_project", projectName)
+            .put("gcp_location", gcpLocation)
+            .put("io-impl", ioImpl)
+            .build();
+
+    Map<String, Object> icebergWriteConfig =
+        ImmutableMap.<String, Object>builder()
+            .put("table", tableIdentifier)
+            .put("catalog_properties", catalogProps)
+            .put("catalog_name", catalogName)
+            .build();
+
+    Map<String, Object> bigQueryReadConfig =
+        ImmutableMap.<String, Object>builder()
+            .put("table", BQ_TABLE)
+            .put("fields", ImmutableList.of("device.browser", 
"totals.transactions"))
+            .put("row_restriction", "totals.transactions is not null")
+            .build();
+
+    Pipeline p = Pipeline.create(options);
+
+    p.apply("ReadFromBigQuery", 
Managed.read(Managed.BIGQUERY).withConfig(bigQueryReadConfig))
+        .get("output")
+        .apply(
+            "Flatten",
+            MapElements.into(TypeDescriptors.rows())
+                .via(IcebergBatchWriteExample::flattenAnalyticsRow))
+        .setRowSchema(BQ_SCHEMA)
+        .apply("CountTransactions", new CountTransactions())
+        .setRowSchema(AGGREGATED_SCHEMA)
+        .apply("WriteToIceberg", 
Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
+
+    p.run().waitUntilFinish();
+  }
+}

Reply via email to