gemini-code-assist[bot] commented on PR #38058:
URL: https://github.com/apache/beam/pull/38058#issuecomment-4181076660

   ## Summary of Changes
   
   This pull request introduces a significant enhancement to the BigQuery 
Storage Write API connector, allowing it to automatically adapt to evolving 
data schemas. When data with new fields or relaxed field constraints arrives, 
the system will now detect these changes, buffer the affected records, and 
dynamically update the BigQuery table's schema. This capability greatly 
simplifies data ingestion pipelines by removing the need for manual schema 
management in response to minor data evolution, ensuring data flow continues 
uninterrupted.
   
   ### Highlights
   
   * **Automatic Schema Evolution**: Implemented a mechanism for BigQuery 
Storage Write API to automatically detect and apply schema updates (field 
additions and relaxation) to target tables when unknown fields or schema 
mismatches are encountered in incoming data.
   * **Schema Mismatch Detection and Buffering**: Introduced logic to identify 
when an incoming record's schema differs from the BigQuery table's current 
schema. Records requiring schema updates are buffered and reprocessed after the 
table schema has been patched.
   * **Incremental Schema Generation and Patching**: Developed utilities to 
generate an 'incremental schema' based on detected differences, which is then 
used to patch the BigQuery table. This process is throttled to prevent 
excessive API calls.
   * **Enhanced Error Collection**: Refactored `TableRowToStorageApiProto` to 
include an `ErrorCollector` that gathers schema conversion exceptions, allowing 
for more granular error handling and the generation of incremental schema 
updates.
   * **API Integration for Schema Updates**: Extended `BigQueryServices` with a 
`patchTableSchema` method, enabling programmatic updates to BigQuery table 
schemas directly from the Beam pipeline.
   
   🧠 **New Feature in Public Preview:** You can now enable **Memory** to help 
**Gemini Code Assist** learn from your team's feedback. This makes future code 
reviews more consistent and personalized to your project's style. **Click 
[here](https://codeassist.google/code-review/login) to enable Memory in your 
admin console.**
   
   <details>
   <summary><b>Changelog</b></summary>
   
   * 
**examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java**
       * Updated class declaration with an extra space for formatting 
consistency.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java**
       * Imported `Preconditions` for utility functions.
       * Modified `encodeUnknownFields` to pass `ErrorCollector.DONT_COLLECT` 
to `messageFromTableRow`.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java**
       * Added validation to prevent schema update options from being used with 
auto schema update, Beam schemas, or direct proto writes.
       * Passed `elementCoder` and `destinationCoder` to the `StorageApiLoads` 
constructor.
       * Updated `StorageApiDynamicDestinationsTableRow` instantiation to 
include `schemaUpdateOptions`.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java**
       * Added `patchTableSchema` method to the `BigQueryRpc` interface for 
updating table schemas.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java**
       * Implemented the `patchTableSchema` method in `BigQueryRpcImpl` to 
update BigQuery table schemas.
       * Added `DONT_RETRY_INVALID_ARG_OR_PRECONDITION` retry function for 
specific API errors.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/MergeSchemaCombineFn.java**
       * Added new class `MergeSchemaCombineFn` which extends 
`Combine.CombineFn` to merge BigQuery `TableSchema` objects.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java**
       * Imported `isPayloadSchemaOutOfDate` and `Descriptors`.
       * Added `schemaMismatchSeen` field to `Value` and `SplittingIterable`.
       * Updated constructor to accept `getCurrentTableSchemaHash` and 
`getCurrentTableSchemaDescriptor` suppliers.
       * Modified `next` method to detect and track schema mismatches using 
`isPayloadSchemaOutOfDate`.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java**
       * Refactored `expand` method to integrate schema update logic, including 
new `PatchTableSchemaDoFn` and `SchemaUpdateHoldingFn`.
       * Added new output tags (`patchTableSchemaTag`, 
`elementsWaitingForSchemaTag`) for schema patching and buffering elements.
       * Introduced `SchemaUpdateHoldingFn` to buffer elements awaiting schema 
updates and `PatchTableSchemaDoFn` to apply schema patches.
       * Modified `ConvertMessagesDoFn` to use an `ErrorCollector` for schema 
conversion errors and to output elements for schema patching or retry.
       * Added `BufferedCollectorInformation` inner class to track schema 
errors per destination.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java**
       * Added `getSchemaHash` and `updateSchemaFromTable` methods to the 
`MessageConverter` interface.
       * Modified `toMessage` method in `MessageConverter` to accept an 
`ErrorCollector` parameter.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java**
       * Implemented `getSchemaHash` and `updateSchemaFromTable` methods.
       * Updated `toMessage` method to accept `collectedExceptions` parameter.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java**
       * Implemented `getSchemaHash` and `updateSchemaFromTable` methods.
       * Updated `toMessage` method to accept `collectedExceptions` parameter.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java**
       * Implemented `getSchemaHash` and `updateSchemaFromTable` methods.
       * Updated `toMessage` method to accept `collectedExceptions` parameter.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java**
       * Introduced `SchemaUpgradingTableRowConverter` to manage schema updates 
for TableRow conversions.
       * Added `schemaUpdateOptions` parameter to the constructor.
       * Implemented `getSchemaHash` and `updateSchemaFromTable` methods.
       * Modified `toMessage` method to accept `collectedExceptions` and 
include schema hash in the payload.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java**
       * Added `elementCoder` and `destinationCoder` to the constructor.
       * Updated `StorageApiConvertMessages` instantiation to pass 
`elementCoder` and `destinationCoder`.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java**
       * Added `getSchemaHash` and `setSchemaHash` methods to 
`StorageApiWritePayload` for tracking schema versions.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java**
       * Imported `isPayloadSchemaOutOfDate` for schema mismatch detection.
       * Added schema mismatch detection logic within `addMessage` and `flush` 
methods.
       * Updated `SplittingIterable` constructor call to pass schema hash and 
descriptor suppliers.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java**
       * Added schema mismatch detection logic within `process` method.
       * Updated `SplittingIterable` constructor call to pass schema hash and 
descriptor suppliers.
       * Refined error handling for append failures, including closing clients 
for non-quota errors.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java**
       * Imported `toSet` for stream collectors.
       * Introduced `ErrorCollector` class to gather schema conversion 
exceptions.
       * Added new `SchemaConversionException` types: 
`SchemaTooNarrowException` and `SchemaMissingRequiredFieldException`.
       * Implemented schema hashing logic (`tableSchemaHash`) for `TableSchema` 
and `TableFieldSchema`.
       * Modified `messageFromMap` and `messageFromTableRow` to accept and 
utilize `ErrorCollector` for schema evolution handling.
       * Updated `messageValueFromFieldValue` and `singularFieldToProtoValue` 
to propagate `ErrorCollector` and handle missing required fields.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java**
       * Added `putSchema` method to explicitly set a schema in the cache.
       * Modified `refreshSchema` to use `putIfAbsent` for `tablesToRefresh` to 
avoid overwriting existing refresh requests.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java**
       * Added new class `UpgradeTableSchema` to encapsulate schema upgrade 
logic.
       * Implemented `newErrorCollector` to create an `ErrorCollector` instance.
       * Provided `getIncrementalSchema` to construct a new schema based on 
collected errors and an old schema.
       * Implemented `mergeSchemas` and `mergeFields` for combining BigQuery 
table schemas.
       * Added `isPayloadSchemaOutOfDate` to check for schema mismatches 
between payload and current table schema.
       * Included `hasUnknownFields` to recursively check for unknown fields in 
a protobuf message.
   * 
**sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java**
       * Added `getStorageException` helper for creating Storage API exceptions.
       * Modified `appendRows` to return `Exceptions.StorageException` instead 
of throwing `RuntimeException`.
       * Implemented `patchTableSchema` to simulate BigQuery schema updates.
       * Added `checkSchemaChanges` and `checkSchemaChangesProtos` to validate 
schema evolution rules.
       * Removed check for unknown fields in `appendRows` to align with new 
schema evolution behavior.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java**
       * Added `testAutoPatchTableSchemaTest` to verify automatic schema 
patching functionality.
   * 
**sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java**
       * Updated calls to `messageFromTableRow` to include the new 
`ErrorCollector` parameter.
   </details>
   
   <details>
   <summary><b>Activity</b></summary>
   
   * reuvenlax requested a Gemini review.
   * gemini-code-assist[bot] reported an error creating the summary.
   * reuvenlax requested a Gemini summary again.
   </details>
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to