pandasanjay opened a new pull request, #35197:
URL: https://github.com/apache/beam/pull/35197
**Description:**
This pull request introduces a new `EnrichmentSourceHandler` for Apache
Beam, `BigQueryStorageEnrichmentHandler`, designed to leverage the Google Cloud
BigQuery Storage Read API for efficient data enrichment. This handler provides
a high-performance alternative to traditional SQL-based BigQuery lookups within
Beam pipelines.
**Motivation and Context:**
Enriching data by joining PCollection elements with data stored in BigQuery
is a common use case. While existing methods often rely on executing SQL
queries, the BigQuery Storage Read API offers a more direct and typically
faster way to retrieve data, especially for large volumes or when fine-grained
row-level access is needed. This handler aims to:
* Improve the performance of BigQuery enrichments.
* Reduce BigQuery query costs associated with SQL execution for enrichment
tasks.
* Provide more flexible and programmatic control over data fetching and
filtering.
**Key Features and Improvements:**
The `BigQueryStorageEnrichmentHandler` offers several enhancements:
* **Efficient Data Retrieval:** Utilizes the BigQuery Storage Read API for
significantly faster data reads compared to SQL queries, especially for bulk
lookups. Data is read in Apache Arrow format, minimizing
serialization/deserialization overhead.
* **Flexible Filtering:**
* Supports static filter templates via `row_restriction_template`.
* Allows dynamic, per-element filter string generation using
`row_restriction_template_fn`.
* **Advanced Keying and Value Extraction:**
* `fields`: Specifies input `beam.Row` fields for generating join keys
and for use in filter templates.
* `additional_condition_fields`: Allows using input fields for
filtering *without* including them in the join key.
* `condition_value_fn`: Provides complete control over generating the
dictionary of values used for both filtering and join key creation.
* **Field Renaming/Aliasing:** Supports aliasing of selected BigQuery
columns (e.g., `original_col as alias_col` in `column_names`) to prevent naming
conflicts in the enriched `beam.Row`.
* **Batching Support:** Groups multiple input elements to make fewer
`CreateReadSession` calls, reducing API overhead. Batch size and duration are
configurable (`min_batch_size`, `max_batch_size`, `max_batch_duration_secs`).
* **Parallel Stream Reading:** (Experimental) Employs a
`ThreadPoolExecutor` to read data from multiple streams of a BigQuery Read
Session in parallel, potentially improving data fetching throughput.
Concurrency is configurable via `max_parallel_streams`.
* **Custom Row Selection:** Includes a `latest_value_selector` callback
that allows users to define custom logic for selecting the desired row when
multiple BigQuery rows match a single input request (e.g., picking the record
with the most recent timestamp). `primary_keys` can be used by this selector.
* **Automatic Client Management:** Manages the lifecycle of the
`BigQueryReadClient`.
**Advantages over Traditional SQL-based BigQuery Enrichment:**
* **Performance:** Direct access to table storage via the Storage Read API
typically bypasses the SQL query processing engine, leading to lower latency
and higher throughput, especially for fetching many individual rows or large
data segments.
* **Cost Efficiency:** Reading data via the Storage API can be more
cost-effective than running many small SQL queries, as Storage API pricing is
based on data scanned, while query pricing involves slots and scanned data.
* **Scalability:** The streaming nature of the Storage Read API is
well-suited for scalable data processing in Beam.
* **Reduced Query Complexity:** For simple lookups, it avoids the need to
construct and manage SQL query strings dynamically.
**Documentation:**
Comprehensive documentation for this handler, including usage examples,
parameter descriptions, features, and limitations, has been added in [
`docs/bigquery_storage_enrichment_handler.md`](https://github.com/pandasanjay/beam-custom-implementation/blob/main/docs/bigquery_storage_enrichment_handler.md).
**Implementation Details:**
The handler
(`sdk/ptyhon/transforms/enrichment_handlers/bigquery_storage_read.py`) manages
`BigQueryReadClient` instances, constructs `ReadSession` requests with
appropriate row restrictions and selected fields, and processes the resulting
Arrow record batches. It integrates with Beam's `Enrichment` transform,
providing batching and caching key generation.
**Testing Considerations:**
* Unit tests for key generation, filter construction, and data processing
logic.
* Integration tests against a live BigQuery instance.
* Performance benchmarks comparing against SQL-based handlers.
This handler provides a powerful and efficient way to enrich data in Apache
Beam pipelines using BigQuery.
**Please** add a meaningful description for your change here
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]