damccorm commented on code in PR #30430: URL: https://github.com/apache/beam/pull/30430#discussion_r1517826022
########## website/www/site/content/en/documentation/io/built-in/webapis.md: ########## @@ -0,0 +1,536 @@ +--- +title: "Web Apis I/O connector" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +[Built-in I/O Transforms](/documentation/io/built-in/) + +# Web APIs I/O connector + +{{< language-switcher java >}} + +The Beam SDKs include a built-in transform, called RequestResponseIO to support reads and writes with Web APIs such as +REST or gRPC. + +Discussion below focuses on the Java SDK. Python examples will be added in the future; see tracker issue: +[#30422](https://github.com/apache/beam/issues/30422). Additionally, support for the Go SDK is not yet available; +see tracker issue: [#30423](https://github.com/apache/beam/issues/30423). + + +## RequestResponseIO Features + +Features this transform provides include: +* developers provide minimal code that invokes Web API endpoint +* delegate to the transform to handle request retries and exponential backoff +* optional caching of request and response associations +* optional metrics + +This guide currently focuses on the first two bullet points above, the minimal code requirements and error handling. +In the future, it may be expanded to show examples of additional features. Links to additional resources is +provided below. + +## Additional resources + +{{< paragraph class="language-java" wrap="span" >}} +* [RequestResponseIO source code](https://github.com/apache/beam/tree/master/sdks/java/io/rrio) +* [RequestResponseIO Javadoc](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html) + {{< /paragraph >}} + +## Before you start + +{{< paragraph class="language-java" >}} +To use RequestResponseIO, add the dependency to your [Gradle](https://gradle.org) `build.gradle(.kts)` or +[Maven](https://maven.apache.org/) `pom.xml` file. See +[Maven Central](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio) for available versions. +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} +Below shows an example adding the [Beam BOM](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom) +and related dependencies such as Beam core to your `build.gradle(.kts)` file. +{{< /paragraph >}} + +{{< highlight java >}} +// Apache Beam BOM +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom +implementation("org.apache.beam:beam-sdks-java-bom:{{< param release_latest >}}") + +// Beam Core SDK +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core +implementation("org.apache.beam:beam-sdks-java-core") + +// RequestResponseIO dependency +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio +implementation("org.apache.beam:beam-sdks-java-io-rrio") +{{< /highlight >}} + +{{< paragraph class="language-java" >}} +Or using Maven, add the artifact dependency to your `pom.xml` file. +{{< /paragraph >}} + +{{< highlight java >}} +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-rrio</artifactId> + <version>{{< param release_latest >}}</version> +</dependency> +{{< /highlight >}} + + +## RequestResponseIO basics + +### Minimal code + +The minimal code needed to read from or write to Web APIs is: + +{{< paragraph class="language-java" wrap="span" >}} +1. [Caller](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html) implementation. +2. Instantiate [RequestResponseIO](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html). +{{< /paragraph >}} + +#### Implementing the Caller + +{{< paragraph class="language-java" >}} +[Caller](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html) requires +only one method override: [call](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html#call-RequestT-), whose +purpose is to interact with the API, converting a request into a response. +The transform's DoFn invokes this method within its +[DoFn.ProcessElement](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) +method. The transform handles everything else including repeating failed requests and exponential backoff +(discussed more below). +{{< /paragraph >}} + +{{< highlight java >}} +// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse. +class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> { + + @Override + public MyResponse call(MyRequest request) throws UserCodeExecutionException { + + // Do something with request and return the response. + + } + +} +{{< /highlight >}} + +#### Instantiate RequestResponseIO + +{{< paragraph class="language-java" >}} +Using [RequestResponseIO](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html) +is as simple as shown below. As mentioned, it minimally requires two parameters: the `Caller` and the expected +[Coder](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/Coder.html) of the response. (_Note: If the concept of a Beam Coder is new to you, please see the +[Apache Beam Programming Guide](/documentation/programming-guide/#data-encoding-and-type-safety) +on this subject. This guide also has an example below._) +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} +The `RequestResponseIO` transform returns a [Result](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html) +that bundles any failures and the `PCollection` of successful responses. In Beam, we call this the +[additional outputs](/documentation/programming-guide/#additional-outputs) pattern, +which typically requires a bit of boilerplate but the transform takes care of it for you. Using the transform, +you get the success and failure `PCollection`s via +[Result::getFailures](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html#getFailures--) +and [Result::getResponses](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html#getResponses--). + +Below shows an abbreviated snippet how the transform may work in your pipeline. +{{< /paragraph >}} + +{{< highlight java >}} +// Step 1. Define the Coder for the response. +Coder<MyResponse> responseCoder = ... + +// Step 2. Build the request PCollection. +PCollection<MyRequest> requests = ... + +// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection. +Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder)); + +// Step 4a. Do something with the responses. +result.getResponses().apply( ... ); + +// Step 4b. Apply failures to a dead letter sink. +result.getFailures().apply( ... ); + +{{< /highlight >}} + +{{< paragraph >}} +`RequestResponseIO` takes care of everything else needed to invoke the `Caller` for each request. It doesn't care what +you do inside your `Caller`, whether you make raw HTTP calls or use client code. Later this guide discusses the +advantage of this design for testing. +{{< /paragraph >}} + +### API call repeats and failures + +As mentioned above, `RequestResponseIO` returns a +[Result](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html) +that bundles both the success and failure `PCollection`s resulting from you `Caller`. This section provides +a little more detail about handling failures and specifics on API call repeats with backoff. + +#### Handling failures + +The failures are an +[ApiIOError](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/ApiIOError.html) +`PCollection` that you may apply to a logging transform or a transform that +saves the errors to a downstream sink for later analysis and troubleshooting. + +Since `ApiIOError` is already mapped to a Beam Schema, it has compatibility with most of Beam's existing I/O +connectors. +(_Note: If the concept of Beam Schemas is new to you, please see the +[Beam Programming Guide](/documentation/programming-guide/#schemas)._) +For example, you can easily send `ApiIOError` records to BigQuery for analysis and troubleshooting as shown +below **without** converting the records first to a +[TableRow](https://www.javadoc.io/doc/com.google.apis/google-api-services-bigquery/v2-rev20230812-2.0.0/com/google/api/services/bigquery/model/TableRow.html). + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/AdditionalSnippets.java" webapis_java_write_failures_bigquery >}} +{{< /highlight >}} + +#### API call repeats and backoff + +Prior to emitting to the failure `PCollection`, the transform performs a retry **for certain errors** +after a prescribed exponential backoff. Your `Caller` must throw specific errors, to signal the transform +to perform the retry with backoff. Throwing a +[UserCodeExecutionException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeExecutionException.html) +will immediately emit the error into the `ApiIOError` `PCollection`. + +`RequestResponseIO` will attempt a retry with backoff when `Caller` throws: +* [UserCodeQuotaException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeQuotaException.html) +* [UserCodeRemoteSystemException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.html) +* [UserCodeTimeoutException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeTimeoutException.html) + +After a threshold number of retries, the error is emitted into the failure `PCollection`. + +#### Testing + +Since `RequestResponseIO` doesn't care what you do inside your `Caller` implementation, this makes some testing more convenient. +Instead of relying on direct calls to a real API within some tests, consequently depending on your external resource, +you simply implement a version of your `Caller` +returning responses or throwing exceptions, according to your test logic. +For example, if you want to test a downstream step in your pipeline for a specific response, say empty records, you +could easily do so via the following. For more information on testing your Beam Pipelines, see +the [Beam Programming Guide](/documentation/pipelines/test-your-pipeline/). + +{{< highlight java >}} + +@Test +void givenEmptyResponse_thenExpectSomething() { + // Test expects PTransform underTest should do something as a result of empty records, for example. + PTransform<Iterable<String>, ?> underTest = ... + + PCollection<String> requests = pipeline.apply(Create.of("aRequest")); + IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of()); + Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder); + + PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...) + + pipeline.run(); +} + +// MockEmptyIterableResponse simulates when there are no results from the API. +class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> { +@Override + public Iterable<String> call(String request) throws UserCodeExecutionException { + return Collections.emptyList(); + } +} +{{< /highlight >}} + +## Practical examples + +Below shows two examples that we will bring together in an end-to-end Beam pipeline. The goal of this pipeline is to +download images and use +[Gemini on Vertex AI](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstarts/quickstart-multimodal) +to recognize the image content. + +Note that this example does not replace our current AI/ML solutions. Please see +[Get started with AI/ML pipelines](/documentation/ml/overview/) +for more details on using Beam with AI/ML. + +### Working with HTTP calls directly + +We first need to download images. To do so, we need to make HTTP calls to the image URL and emit their content +into a `PCollection` for use with the Gemini API. The value of this example on its own is that it demonstrates +how to use `RequestResponseIO` to make raw HTTP requests. + +#### Define Caller + +We implement the `Caller`, the `HttpImageClient`, that receives an `ImageRequest` and returns an `ImageResponse`. + +_For demo purposes, the example uses a +[KV](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/KV.html) +to preserve the raw URL in the returned `ImageResponse` containing `KV`._ + +##### Abbreviated snippet + +Below shows an abbreviated version of the `HttpImageClient` showing the important parts. + +{{< highlight java >}} +class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> { + + private static final HttpRequestFactory REQUEST_FACTORY = + new NetHttpTransport().createRequestFactory(); + + @Override + public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException { + + ImageRequest request = requestKV.getValue(); + GenericUrl url = new GenericUrl(request.getImageUrl()); + HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url); + HttpResponse response = imageRequest.execute(); + + return KV.of( + requestKV.getKey(), + ImageResponse + .builder() + // Build ImageResponse from HttpResponse + .build() + ); + } + +} +{{< /highlight >}} + +##### Full example + +The full implementation is shown below illustrating throwing various exceptions based on the HTTP response code. + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/HttpImageClient.java" webapis_java_image_caller >}} +{{< /highlight >}} + +#### Define request + +`ImageRequest` is the custom request we provide the `HttpImageClient`, defined in the example above, to invoke the HTTP call +that acquires the image. +{{< paragraph class="language-java" wrap="span" >}}This example happens to use [Google AutoValue](https://github.com/google/auto/blob/main/value/userguide/index.md), +but you can use any custom `Serializable` Java class as you would in any Beam `PCollection`, +including inherent Java classes such as `String`, `Double`, etc. For convenience, this example uses +`@DefaultSchema(AutoValueSchema.class)` allowing us to map our custom type to a +[Beam Schema](/documentation/programming-guide/#schemas) automatically based on its getters. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageRequest.java" webapis_java_image_request >}} +{{< /highlight >}} + +#### Define response + +`ImageResponse` is the custom response we return from the `HttpImageClient`, defined in the example above, that contains the image data +as a result of calling the remote server with the image URL. {{< paragraph class="language-java" wrap="span" >}}Again, +this example happens to use [Google AutoValue](https://github.com/google/auto/blob/main/value/userguide/index.md), +but you can use any custom `Serializable` Java class as you would in any Beam `PCollection` +including inherent Java classes such as `String`, `Double`, etc.{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageResponse.java" webapis_java_image_response >}} +{{< /highlight >}} + +#### Define response coder + +{{< paragraph class="language-java" >}} +`RequestResponseIO` needs the response's +[Coder](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/Coder.html) +as its second required parameter, shown in the example below. Please see the +[Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety) +for more information about Beam Coders. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageResponseCoder.java" webapis_java_image_response_coder >}} +{{< /highlight >}} + +#### Acquire image data from URLs + +Below shows an example how to bring everything together in an end-to-end pipeline. From a list of image URLs, +the example builds the `PCollection` of `ImageRequest`s that is applied to an instantiated `RequestResponseIO`, +using the `HttpImageClient` `Caller` implementation. + +{{< paragraph class="language-java" >}} +Any failures, accessible from the `Result`'s `getFailures` getter, is outputted to logs. As already discussed above, Review Comment: ```suggestion Any failures, accessible from the `Result`'s `getFailures` getter, are outputted to logs. As already discussed above, ``` Nit ########## website/www/site/content/en/documentation/io/built-in/webapis.md: ########## @@ -0,0 +1,536 @@ +--- +title: "Web Apis I/O connector" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +[Built-in I/O Transforms](/documentation/io/built-in/) + +# Web APIs I/O connector + +{{< language-switcher java >}} + +The Beam SDKs include a built-in transform, called RequestResponseIO to support reads and writes with Web APIs such as +REST or gRPC. + +Discussion below focuses on the Java SDK. Python examples will be added in the future; see tracker issue: +[#30422](https://github.com/apache/beam/issues/30422). Additionally, support for the Go SDK is not yet available; +see tracker issue: [#30423](https://github.com/apache/beam/issues/30423). + + +## RequestResponseIO Features + +Features this transform provides include: +* developers provide minimal code that invokes Web API endpoint +* delegate to the transform to handle request retries and exponential backoff +* optional caching of request and response associations +* optional metrics + +This guide currently focuses on the first two bullet points above, the minimal code requirements and error handling. +In the future, it may be expanded to show examples of additional features. Links to additional resources is +provided below. + +## Additional resources + +{{< paragraph class="language-java" wrap="span" >}} +* [RequestResponseIO source code](https://github.com/apache/beam/tree/master/sdks/java/io/rrio) +* [RequestResponseIO Javadoc](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html) + {{< /paragraph >}} + +## Before you start + +{{< paragraph class="language-java" >}} +To use RequestResponseIO, add the dependency to your [Gradle](https://gradle.org) `build.gradle(.kts)` or +[Maven](https://maven.apache.org/) `pom.xml` file. See +[Maven Central](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio) for available versions. +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} +Below shows an example adding the [Beam BOM](https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom) +and related dependencies such as Beam core to your `build.gradle(.kts)` file. +{{< /paragraph >}} + +{{< highlight java >}} +// Apache Beam BOM +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom +implementation("org.apache.beam:beam-sdks-java-bom:{{< param release_latest >}}") + +// Beam Core SDK +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core +implementation("org.apache.beam:beam-sdks-java-core") + +// RequestResponseIO dependency +// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio +implementation("org.apache.beam:beam-sdks-java-io-rrio") +{{< /highlight >}} + +{{< paragraph class="language-java" >}} +Or using Maven, add the artifact dependency to your `pom.xml` file. +{{< /paragraph >}} + +{{< highlight java >}} +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-rrio</artifactId> + <version>{{< param release_latest >}}</version> +</dependency> +{{< /highlight >}} + + +## RequestResponseIO basics + +### Minimal code + +The minimal code needed to read from or write to Web APIs is: + +{{< paragraph class="language-java" wrap="span" >}} +1. [Caller](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html) implementation. +2. Instantiate [RequestResponseIO](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html). +{{< /paragraph >}} + +#### Implementing the Caller + +{{< paragraph class="language-java" >}} +[Caller](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html) requires +only one method override: [call](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Caller.html#call-RequestT-), whose +purpose is to interact with the API, converting a request into a response. +The transform's DoFn invokes this method within its +[DoFn.ProcessElement](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html) +method. The transform handles everything else including repeating failed requests and exponential backoff +(discussed more below). +{{< /paragraph >}} + +{{< highlight java >}} +// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse. +class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> { + + @Override + public MyResponse call(MyRequest request) throws UserCodeExecutionException { + + // Do something with request and return the response. + + } + +} +{{< /highlight >}} + +#### Instantiate RequestResponseIO + +{{< paragraph class="language-java" >}} +Using [RequestResponseIO](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/RequestResponseIO.html) +is as simple as shown below. As mentioned, it minimally requires two parameters: the `Caller` and the expected +[Coder](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/Coder.html) of the response. (_Note: If the concept of a Beam Coder is new to you, please see the +[Apache Beam Programming Guide](/documentation/programming-guide/#data-encoding-and-type-safety) +on this subject. This guide also has an example below._) +{{< /paragraph >}} + +{{< paragraph class="language-java" >}} +The `RequestResponseIO` transform returns a [Result](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html) +that bundles any failures and the `PCollection` of successful responses. In Beam, we call this the +[additional outputs](/documentation/programming-guide/#additional-outputs) pattern, +which typically requires a bit of boilerplate but the transform takes care of it for you. Using the transform, +you get the success and failure `PCollection`s via +[Result::getFailures](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html#getFailures--) +and [Result::getResponses](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html#getResponses--). + +Below shows an abbreviated snippet how the transform may work in your pipeline. +{{< /paragraph >}} + +{{< highlight java >}} +// Step 1. Define the Coder for the response. +Coder<MyResponse> responseCoder = ... + +// Step 2. Build the request PCollection. +PCollection<MyRequest> requests = ... + +// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection. +Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder)); + +// Step 4a. Do something with the responses. +result.getResponses().apply( ... ); + +// Step 4b. Apply failures to a dead letter sink. +result.getFailures().apply( ... ); + +{{< /highlight >}} + +{{< paragraph >}} +`RequestResponseIO` takes care of everything else needed to invoke the `Caller` for each request. It doesn't care what +you do inside your `Caller`, whether you make raw HTTP calls or use client code. Later this guide discusses the +advantage of this design for testing. +{{< /paragraph >}} + +### API call repeats and failures + +As mentioned above, `RequestResponseIO` returns a +[Result](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/Result.html) +that bundles both the success and failure `PCollection`s resulting from you `Caller`. This section provides +a little more detail about handling failures and specifics on API call repeats with backoff. + +#### Handling failures + +The failures are an +[ApiIOError](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/ApiIOError.html) +`PCollection` that you may apply to a logging transform or a transform that +saves the errors to a downstream sink for later analysis and troubleshooting. + +Since `ApiIOError` is already mapped to a Beam Schema, it has compatibility with most of Beam's existing I/O +connectors. +(_Note: If the concept of Beam Schemas is new to you, please see the +[Beam Programming Guide](/documentation/programming-guide/#schemas)._) +For example, you can easily send `ApiIOError` records to BigQuery for analysis and troubleshooting as shown +below **without** converting the records first to a +[TableRow](https://www.javadoc.io/doc/com.google.apis/google-api-services-bigquery/v2-rev20230812-2.0.0/com/google/api/services/bigquery/model/TableRow.html). + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/AdditionalSnippets.java" webapis_java_write_failures_bigquery >}} +{{< /highlight >}} + +#### API call repeats and backoff + +Prior to emitting to the failure `PCollection`, the transform performs a retry **for certain errors** +after a prescribed exponential backoff. Your `Caller` must throw specific errors, to signal the transform +to perform the retry with backoff. Throwing a +[UserCodeExecutionException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeExecutionException.html) +will immediately emit the error into the `ApiIOError` `PCollection`. + +`RequestResponseIO` will attempt a retry with backoff when `Caller` throws: +* [UserCodeQuotaException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeQuotaException.html) +* [UserCodeRemoteSystemException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeRemoteSystemException.html) +* [UserCodeTimeoutException](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/UserCodeTimeoutException.html) + +After a threshold number of retries, the error is emitted into the failure `PCollection`. + +#### Testing + +Since `RequestResponseIO` doesn't care what you do inside your `Caller` implementation, this makes some testing more convenient. +Instead of relying on direct calls to a real API within some tests, consequently depending on your external resource, +you simply implement a version of your `Caller` +returning responses or throwing exceptions, according to your test logic. +For example, if you want to test a downstream step in your pipeline for a specific response, say empty records, you +could easily do so via the following. For more information on testing your Beam Pipelines, see +the [Beam Programming Guide](/documentation/pipelines/test-your-pipeline/). + +{{< highlight java >}} + +@Test +void givenEmptyResponse_thenExpectSomething() { + // Test expects PTransform underTest should do something as a result of empty records, for example. + PTransform<Iterable<String>, ?> underTest = ... + + PCollection<String> requests = pipeline.apply(Create.of("aRequest")); + IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of()); + Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder); + + PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...) + + pipeline.run(); +} + +// MockEmptyIterableResponse simulates when there are no results from the API. +class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> { +@Override + public Iterable<String> call(String request) throws UserCodeExecutionException { + return Collections.emptyList(); + } +} +{{< /highlight >}} + +## Practical examples + +Below shows two examples that we will bring together in an end-to-end Beam pipeline. The goal of this pipeline is to +download images and use +[Gemini on Vertex AI](https://cloud.google.com/vertex-ai/generative-ai/docs/start/quickstarts/quickstart-multimodal) +to recognize the image content. + +Note that this example does not replace our current AI/ML solutions. Please see +[Get started with AI/ML pipelines](/documentation/ml/overview/) +for more details on using Beam with AI/ML. + +### Working with HTTP calls directly + +We first need to download images. To do so, we need to make HTTP calls to the image URL and emit their content +into a `PCollection` for use with the Gemini API. The value of this example on its own is that it demonstrates +how to use `RequestResponseIO` to make raw HTTP requests. + +#### Define Caller + +We implement the `Caller`, the `HttpImageClient`, that receives an `ImageRequest` and returns an `ImageResponse`. + +_For demo purposes, the example uses a +[KV](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/KV.html) +to preserve the raw URL in the returned `ImageResponse` containing `KV`._ + +##### Abbreviated snippet + +Below shows an abbreviated version of the `HttpImageClient` showing the important parts. + +{{< highlight java >}} +class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> { + + private static final HttpRequestFactory REQUEST_FACTORY = + new NetHttpTransport().createRequestFactory(); + + @Override + public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException { + + ImageRequest request = requestKV.getValue(); + GenericUrl url = new GenericUrl(request.getImageUrl()); + HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url); + HttpResponse response = imageRequest.execute(); + + return KV.of( + requestKV.getKey(), + ImageResponse + .builder() + // Build ImageResponse from HttpResponse + .build() + ); + } + +} +{{< /highlight >}} + +##### Full example + +The full implementation is shown below illustrating throwing various exceptions based on the HTTP response code. + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/HttpImageClient.java" webapis_java_image_caller >}} +{{< /highlight >}} + +#### Define request + +`ImageRequest` is the custom request we provide the `HttpImageClient`, defined in the example above, to invoke the HTTP call +that acquires the image. +{{< paragraph class="language-java" wrap="span" >}}This example happens to use [Google AutoValue](https://github.com/google/auto/blob/main/value/userguide/index.md), +but you can use any custom `Serializable` Java class as you would in any Beam `PCollection`, +including inherent Java classes such as `String`, `Double`, etc. For convenience, this example uses +`@DefaultSchema(AutoValueSchema.class)` allowing us to map our custom type to a +[Beam Schema](/documentation/programming-guide/#schemas) automatically based on its getters. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageRequest.java" webapis_java_image_request >}} +{{< /highlight >}} + +#### Define response + +`ImageResponse` is the custom response we return from the `HttpImageClient`, defined in the example above, that contains the image data +as a result of calling the remote server with the image URL. {{< paragraph class="language-java" wrap="span" >}}Again, +this example happens to use [Google AutoValue](https://github.com/google/auto/blob/main/value/userguide/index.md), +but you can use any custom `Serializable` Java class as you would in any Beam `PCollection` +including inherent Java classes such as `String`, `Double`, etc.{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageResponse.java" webapis_java_image_response >}} +{{< /highlight >}} + +#### Define response coder + +{{< paragraph class="language-java" >}} +`RequestResponseIO` needs the response's +[Coder](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/Coder.html) +as its second required parameter, shown in the example below. Please see the +[Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety) +for more information about Beam Coders. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/ImageResponseCoder.java" webapis_java_image_response_coder >}} +{{< /highlight >}} + +#### Acquire image data from URLs + +Below shows an example how to bring everything together in an end-to-end pipeline. From a list of image URLs, +the example builds the `PCollection` of `ImageRequest`s that is applied to an instantiated `RequestResponseIO`, +using the `HttpImageClient` `Caller` implementation. + +{{< paragraph class="language-java" >}} +Any failures, accessible from the `Result`'s `getFailures` getter, is outputted to logs. As already discussed above, +one could write these failures to a database or filesystem. +{{< /paragraph >}} + +{{< highlight java >}} +{{< code_sample "examples/java/webapis/src/main/java/org/apache/beam/examples/webapis/UsingHttpClientExample.java" webapis_java_http_get >}} +{{< /highlight >}} + +The pipeline output, shown below, displays a summary of the downloaded image, its URL, mimetype and size. + +{{< highlight >}} +KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130} +KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671} +KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809} +KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375} +KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281} +KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752} +{{< /highlight >}} + +### Using API client code + +The last example demonstrated invoking HTTP requests directly. However, there are some API services that provide +client code that one should use within the `Caller` implementation. Using client code within Beam presents +unique challenges, namely serialization. Additionally, some client code requires explicit handling in terms of +setup and teardown. + +{{< paragraph class="language-java" >}} +`RequestResponseIO` can handle an additional interface called `SetupTeardown` for these scenarios. + +The [SetupTeardown](https://beam.apache.org/releases/javadoc/current/org/apache/beam/io/requestresponse/SetupTeardown.html) +interface has only two methods, setup and teardown. +{{< /paragraph >}} + +{{< highlight java >}} +interface SetupTeardown { + void setup() throws UserCodeExecutionException; + void teardown() throws UserCodeExecutionException; +} +{{< /highlight >}} + +{{< paragraph class="language-java" >}} +The transform calls these setup and teardown methods within its DoFn's +[@Setup](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Setup.html) +and +[@Teardown](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.Teardown.html), +methods respectively. +{{< /paragraph >}} + +The transform also handles retries with backoff, likewise dependent on the thrown Exception, as discussed previously +in this guide. + +#### Define Caller with SetupTeardown + +Below is +an example that adapts +[Vertex AI Gemini Java Client](https://cloud.google.com/vertex-ai/docs/generative-ai/start/quickstarts/quickstart-multimodal) +to work in a Beam pipeline using `RequestResponseIO`, adding usage of the `SetupTeardown` interface, +in addition to the required `Caller`. It has a bit more boilerplate than the simple HTTP example above. + +{{< paragraph class="language-java" >}} + +##### Abbreviated snippet + +An abbreviated snippet showing the important parts is shown below. + +During the `setup` method is where the `GeminiAIClient` instantiates `VertexAI` and `GenerativeModel`, finally closing Review Comment: ```suggestion The `setup` method is where the `GeminiAIClient` instantiates `VertexAI` and `GenerativeModel`, finally closing ``` or alternately: ```suggestion During the `setup` method, the `GeminiAIClient` instantiates `VertexAI` and `GenerativeModel`, finally closing ``` (nit) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
