tarun-google commented on code in PR #35722: URL: https://github.com/apache/beam/pull/35722#discussion_r2240207592
########## examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import java.io.IOException; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * This pipeline demonstrates a batch write to an Iceberg table using the BigQuery Metastore + * catalog. + * + * <p>The pipeline reads from a public BigQuery table containing Google Analytics session data, + * extracts and aggregates the total number of transactions per web browser, and writes the results + * to a new Iceberg table managed by the BigQuery Metastore. + * + * <p>This example is a demonstration of the Iceberg BigQuery Metastore. For more information, see + * the documentation at {@link https://cloud.google.com/bigquery/docs/blms-use-dataproc}. + */ +public class IcebergBatchWriteExample { + + public static final Schema BQ_SCHEMA = + Schema.builder().addStringField("browser").addInt64Field("transactions").build(); + + public static final Schema AGGREGATED_SCHEMA = + Schema.builder().addStringField("browser").addInt64Field("transaction_count").build(); + + static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> { + @ProcessElement + public void processElement(ProcessContext c) { + Row row = c.element(); + c.output( + KV.of( + Preconditions.checkStateNotNull(row.getString("browser")), + Preconditions.checkStateNotNull(row.getInt64("transactions")))); + } + } + + static class FormatCountsFn extends DoFn<KV<String, Long>, Row> { + @ProcessElement + public void processElement(ProcessContext c) { + Row row = + Row.withSchema(AGGREGATED_SCHEMA) + .withFieldValue("browser", c.element().getKey()) + .withFieldValue("transaction_count", c.element().getValue()) + .build(); + c.output(row); + } + } + + static class CountTransactions extends PTransform<PCollection<Row>, PCollection<Row>> { + @Override + public PCollection<Row> expand(PCollection<Row> rows) { + PCollection<KV<String, Long>> browserTransactions = + rows.apply(ParDo.of(new ExtractBrowserTransactionsFn())); + PCollection<KV<String, Long>> browserCounts = browserTransactions.apply(Sum.longsPerKey()); + return browserCounts.apply(ParDo.of(new FormatCountsFn())); + } + } + + /** Pipeline options for this example. */ + public interface IcebergPipelineOptions extends GcpOptions { + @Description( + "Warehouse location where the table's data will be written to. " + + "As of 07/14/25 BigLake only supports Single Region buckets") Review Comment: Not fully inline with this suggestion.. date specifies, why this might change in future versions. So that users can follow up when they are experimenting on using this example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org