gemini-code-assist[bot] commented on code in PR #35722: URL: https://github.com/apache/beam/pull/35722#discussion_r2244038937
########## examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergBatchWriteExample.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import java.io.IOException; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * This pipeline demonstrates a batch write to an Iceberg table using the BigQuery Metastore + * catalog. + * + * <p>The pipeline reads from a public BigQuery table containing Google Analytics session data, + * extracts and aggregates the total number of transactions per web browser, and writes the results + * to a new Iceberg table managed by the BigQuery Metastore. + * + * <p>This example is a demonstration of the Iceberg BigQuery Metastore. For more information, see + * the documentation at {@link https://cloud.google.com/bigquery/docs/blms-use-dataproc}. + */ +public class IcebergBatchWriteExample { + + public static final Schema BQ_SCHEMA = + Schema.builder().addStringField("browser").addInt64Field("transactions").build(); + + public static final Schema AGGREGATED_SCHEMA = + Schema.builder().addStringField("browser").addInt64Field("transaction_count").build(); + + static class ExtractBrowserTransactionsFn extends DoFn<Row, KV<String, Long>> { + @ProcessElement + public void processElement(ProcessContext c) { + Row row = c.element(); + c.output( + KV.of( + Preconditions.checkStateNotNull(row.getString("browser")), + Preconditions.checkStateNotNull(row.getInt64("transactions")))); + } + } + + static class FormatCountsFn extends DoFn<KV<String, Long>, Row> { + @ProcessElement + public void processElement(ProcessContext c) { + Row row = + Row.withSchema(AGGREGATED_SCHEMA) + .withFieldValue("browser", c.element().getKey()) + .withFieldValue("transaction_count", c.element().getValue()) + .build(); + c.output(row); + } + } + + static class CountTransactions extends PTransform<PCollection<Row>, PCollection<Row>> { + @Override + public PCollection<Row> expand(PCollection<Row> rows) { + PCollection<KV<String, Long>> browserTransactions = + rows.apply(ParDo.of(new ExtractBrowserTransactionsFn())); + PCollection<KV<String, Long>> browserCounts = browserTransactions.apply(Sum.longsPerKey()); + return browserCounts.apply(ParDo.of(new FormatCountsFn())); + } + } + + /** Pipeline options for this example. */ + public interface IcebergPipelineOptions extends GcpOptions { + @Description( + "Warehouse location where the table's data will be written to. " + + "As of 07/14/25 BigLake only supports Single Region buckets") Review Comment: Thank you for your feedback, @tarun-google. You raise a valid point about providing context for the current limitations. My apologies for the previous comment, as I should not have referenced a future date in the description. While providing context is important, hardcoding specific dates in code comments can lead to rapid staleness and potential confusion for future readers. Information about *why* a feature might change or when a limitation might be lifted is often better suited for more dynamic documentation (e.g., a changelog, product documentation, or a dedicated `README` section) that can be updated without modifying code. For the code comment itself, it's best to state the current state clearly and concisely. I recommend updating the comment to reflect the current limitation without a specific date, which will keep the code comment accurate and prevent it from becoming outdated. ```suggestion "Warehouse location where the table's data will be written to. " + "Note: BigLake only supports Single Region buckets for the warehouse location.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org