This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b075406fc0f Add Iceberg Rest Catalog Example (#35599)
b075406fc0f is described below

commit b075406fc0fd8caf1944a8c4a12d4598f60f4adc
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Sat Jul 19 14:14:26 2025 -0700

    Add Iceberg Rest Catalog Example (#35599)
    
    * Add Iceberg Rest Catalog Example
    
    * Add Iceberg Rest Catalog Example
    
    * fix imports
    
    * fix imports
    
    * fix comments
    
    * change default bucket name
    
    * fix null check
    
    * simplify kv
---
 .../IcebergRestCatalogStreamingWriteExample.java   | 198 +++++++++++++++++++++
 1 file changed, 198 insertions(+)

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java
new file mode 100644
index 00000000000..d41ef885f59
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergRestCatalogStreamingWriteExample.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+
+/**
+ * Reads real-time NYC taxi ride information from {@code
+ * projects/pubsub-public-data/topics/taxirides-realtime} and writes 
aggregated passenger count data
+ * to an Iceberg table using Beam's {@link Managed} IcebergIO sink.
+ *
+ * <p>This is a streaming pipeline that processes taxi ride events, filters 
for 'dropoff' status,
+ * aggregates passenger counts within fixed 10-second windows by minute of the 
ride, and writes the
+ * results to a single Iceberg table. The Iceberg sink triggers writes every 
30 seconds, creating
+ * new snapshots.
+ *
+ * <p>This example is a demonstration of the Iceberg REST Catalog. For more 
information, see the
+ * documentation at {@link 
https://cloud.google.com/bigquery/docs/blms-rest-catalog}.
+ */
+public class IcebergRestCatalogStreamingWriteExample {
+
+  public static final Schema TAXIRIDES_SCHEMA =
+      Schema.builder()
+          .addInt64Field("passenger_count")
+          .addStringField("ride_status")
+          .addDateTimeField("timestamp")
+          .build();
+
+  public static final Schema AGGREGATED_SCHEMA =
+      
Schema.builder().addStringField("ride_minute").addInt64Field("passenger_count").build();
+
+  /**
+   * Main entry point for the pipeline.
+   *
+   * @param args Command line arguments
+   * @throws IOException if there's an issue with GoogleCredentials
+   */
+  public static void main(String[] args) throws IOException {
+    IcebergPipelineOptions options =
+        
PipelineOptionsFactory.fromArgs(args).withValidation().as(IcebergPipelineOptions.class);
+    options.setProject("apache-beam-testing");
+
+    final String tableIdentifier = "taxi_dataset.passenger_count_by_minute";
+    final String pubsubTopic = options.getTopic();
+    final String catalogUri = options.getCatalogUri();
+    final String warehouseLocation = options.getWarehouse();
+    final String projectName = options.getProject();
+    final String catalogName = options.getCatalogName();
+    final int triggeringFrequencySeconds = 30;
+
+    // Note: The token expires in 1 hour, and users may need to re-run the 
pipeline.
+    // Upcoming changes in Iceberg and the BigLake Metastore with the Iceberg 
REST Catalog
+    // will support token refreshing and credential vending.
+    Map<String, String> catalogProps =
+        ImmutableMap.<String, String>builder()
+            .put("type", "rest")
+            .put("uri", catalogUri)
+            .put("warehouse", warehouseLocation)
+            .put("header.x-goog-user-project", projectName)
+            .put("oauth2-server-uri", "https://oauth2.googleapis.com/token";)
+            .put(
+                "token",
+                
GoogleCredentials.getApplicationDefault().refreshAccessToken().getTokenValue())
+            .put("rest-metrics-reporting-enabled", "false")
+            .build();
+
+    Map<String, Object> icebergWriteConfig =
+        ImmutableMap.<String, Object>builder()
+            .put("table", tableIdentifier)
+            .put("catalog_properties", catalogProps)
+            .put("catalog_name", catalogName)
+            .put("triggering_frequency_seconds", triggeringFrequencySeconds)
+            .build();
+
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<Row> aggregatedRows =
+        p.apply("ReadFromPubSub", 
PubsubIO.readStrings().fromTopic(pubsubTopic))
+            .apply("ConvertJsonToRow", JsonToRow.withSchema(TAXIRIDES_SCHEMA))
+            .apply(
+                "FilterNullFields",
+                Filter.by(
+                    (Row row) ->
+                        row.getInt64("passenger_count") != null
+                            && row.getDateTime("timestamp") != null))
+            .apply(
+                "FilterDropoffRides",
+                Filter.by((Row row) -> 
"dropoff".equals(row.getString("ride_status"))))
+            .apply(
+                "ApplyFixedWindow", 
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(10))))
+            .apply(
+                "ExtractMinuteAndPassengerCount",
+                MapElements.into(
+                        TypeDescriptors.kvs(TypeDescriptors.strings(), 
TypeDescriptors.longs()))
+                    .via(
+                        row ->
+                            KV.of(
+                                
Preconditions.checkStateNotNull(row.getDateTime("timestamp"))
+                                    .toString("yyyy-MM-dd HH:mm"),
+                                row.getInt64("passenger_count"))))
+            .apply("SumPassengerCountPerMinute", Sum.longsPerKey())
+            .apply(
+                "FormatAggregatedRowForIceberg",
+                ParDo.of(
+                    new DoFn<KV<String, Long>, Row>() {
+                      @ProcessElement
+                      public void processElement(
+                          @Element KV<String, Long> kv, OutputReceiver<Row> 
out) {
+                        Row row =
+                            Row.withSchema(AGGREGATED_SCHEMA)
+                                .withFieldValue("ride_minute", kv.getKey())
+                                .withFieldValue("passenger_count", 
kv.getValue())
+                                .build();
+                        out.output(row);
+                      }
+                    }))
+            .setCoder(RowCoder.of(AGGREGATED_SCHEMA));
+
+    aggregatedRows.apply(
+        "WriteToIceberg", 
Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));
+
+    p.run().waitUntilFinish();
+  }
+
+  /** Pipeline options for the IcebergRestCatalogStreamingWriteExample. */
+  public interface IcebergPipelineOptions extends GcpOptions {
+    @Description(
+        "Warehouse location where the table's data will be written to. "
+            + "As of 07/14/25 BigLake only supports Single Region buckets")
+    @Validation.Required
+    @Default.String("gs://biglake_taxi_rides")
+    String getWarehouse();
+
+    void setWarehouse(String warehouse);
+
+    @Description("The URI for the REST catalog.")
+    @Validation.Required
+    
@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog";)
+    String getCatalogUri();
+
+    void setCatalogUri(String value);
+
+    @Description("The Pub/Sub topic to read from.")
+    @Validation.Required
+    @Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
+    String getTopic();
+
+    void setTopic(String value);
+
+    @Validation.Required
+    @Default.String("taxi_rides")
+    String getCatalogName();
+
+    void setCatalogName(String catalogName);
+  }
+}

Reply via email to