gemini-code-assist[bot] commented on PR #38058: URL: https://github.com/apache/beam/pull/38058#issuecomment-4181076660
## Summary of Changes This pull request introduces a significant enhancement to the BigQuery Storage Write API connector, allowing it to automatically adapt to evolving data schemas. When data with new fields or relaxed field constraints arrives, the system will now detect these changes, buffer the affected records, and dynamically update the BigQuery table's schema. This capability greatly simplifies data ingestion pipelines by removing the need for manual schema management in response to minor data evolution, ensuring data flow continues uninterrupted. ### Highlights * **Automatic Schema Evolution**: Implemented a mechanism for BigQuery Storage Write API to automatically detect and apply schema updates (field additions and relaxation) to target tables when unknown fields or schema mismatches are encountered in incoming data. * **Schema Mismatch Detection and Buffering**: Introduced logic to identify when an incoming record's schema differs from the BigQuery table's current schema. Records requiring schema updates are buffered and reprocessed after the table schema has been patched. * **Incremental Schema Generation and Patching**: Developed utilities to generate an 'incremental schema' based on detected differences, which is then used to patch the BigQuery table. This process is throttled to prevent excessive API calls. * **Enhanced Error Collection**: Refactored `TableRowToStorageApiProto` to include an `ErrorCollector` that gathers schema conversion exceptions, allowing for more granular error handling and the generation of incremental schema updates. * **API Integration for Schema Updates**: Extended `BigQueryServices` with a `patchTableSchema` method, enabling programmatic updates to BigQuery table schemas directly from the Beam pipeline. 🧠**New Feature in Public Preview:** You can now enable **Memory** to help **Gemini Code Assist** learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. **Click [here](https://codeassist.google/code-review/login) to enable Memory in your admin console.** <details> <summary><b>Changelog</b></summary> * **examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java** * Updated class declaration with an extra space for formatting consistency. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java** * Imported `Preconditions` for utility functions. * Modified `encodeUnknownFields` to pass `ErrorCollector.DONT_COLLECT` to `messageFromTableRow`. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java** * Added validation to prevent schema update options from being used with auto schema update, Beam schemas, or direct proto writes. * Passed `elementCoder` and `destinationCoder` to the `StorageApiLoads` constructor. * Updated `StorageApiDynamicDestinationsTableRow` instantiation to include `schemaUpdateOptions`. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java** * Added `patchTableSchema` method to the `BigQueryRpc` interface for updating table schemas. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java** * Implemented the `patchTableSchema` method in `BigQueryRpcImpl` to update BigQuery table schemas. * Added `DONT_RETRY_INVALID_ARG_OR_PRECONDITION` retry function for specific API errors. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/MergeSchemaCombineFn.java** * Added new class `MergeSchemaCombineFn` which extends `Combine.CombineFn` to merge BigQuery `TableSchema` objects. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java** * Imported `isPayloadSchemaOutOfDate` and `Descriptors`. * Added `schemaMismatchSeen` field to `Value` and `SplittingIterable`. * Updated constructor to accept `getCurrentTableSchemaHash` and `getCurrentTableSchemaDescriptor` suppliers. * Modified `next` method to detect and track schema mismatches using `isPayloadSchemaOutOfDate`. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java** * Refactored `expand` method to integrate schema update logic, including new `PatchTableSchemaDoFn` and `SchemaUpdateHoldingFn`. * Added new output tags (`patchTableSchemaTag`, `elementsWaitingForSchemaTag`) for schema patching and buffering elements. * Introduced `SchemaUpdateHoldingFn` to buffer elements awaiting schema updates and `PatchTableSchemaDoFn` to apply schema patches. * Modified `ConvertMessagesDoFn` to use an `ErrorCollector` for schema conversion errors and to output elements for schema patching or retry. * Added `BufferedCollectorInformation` inner class to track schema errors per destination. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java** * Added `getSchemaHash` and `updateSchemaFromTable` methods to the `MessageConverter` interface. * Modified `toMessage` method in `MessageConverter` to accept an `ErrorCollector` parameter. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java** * Implemented `getSchemaHash` and `updateSchemaFromTable` methods. * Updated `toMessage` method to accept `collectedExceptions` parameter. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java** * Implemented `getSchemaHash` and `updateSchemaFromTable` methods. * Updated `toMessage` method to accept `collectedExceptions` parameter. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java** * Implemented `getSchemaHash` and `updateSchemaFromTable` methods. * Updated `toMessage` method to accept `collectedExceptions` parameter. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java** * Introduced `SchemaUpgradingTableRowConverter` to manage schema updates for TableRow conversions. * Added `schemaUpdateOptions` parameter to the constructor. * Implemented `getSchemaHash` and `updateSchemaFromTable` methods. * Modified `toMessage` method to accept `collectedExceptions` and include schema hash in the payload. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java** * Added `elementCoder` and `destinationCoder` to the constructor. * Updated `StorageApiConvertMessages` instantiation to pass `elementCoder` and `destinationCoder`. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java** * Added `getSchemaHash` and `setSchemaHash` methods to `StorageApiWritePayload` for tracking schema versions. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java** * Imported `isPayloadSchemaOutOfDate` for schema mismatch detection. * Added schema mismatch detection logic within `addMessage` and `flush` methods. * Updated `SplittingIterable` constructor call to pass schema hash and descriptor suppliers. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java** * Added schema mismatch detection logic within `process` method. * Updated `SplittingIterable` constructor call to pass schema hash and descriptor suppliers. * Refined error handling for append failures, including closing clients for non-quota errors. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java** * Imported `toSet` for stream collectors. * Introduced `ErrorCollector` class to gather schema conversion exceptions. * Added new `SchemaConversionException` types: `SchemaTooNarrowException` and `SchemaMissingRequiredFieldException`. * Implemented schema hashing logic (`tableSchemaHash`) for `TableSchema` and `TableFieldSchema`. * Modified `messageFromMap` and `messageFromTableRow` to accept and utilize `ErrorCollector` for schema evolution handling. * Updated `messageValueFromFieldValue` and `singularFieldToProtoValue` to propagate `ErrorCollector` and handle missing required fields. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java** * Added `putSchema` method to explicitly set a schema in the cache. * Modified `refreshSchema` to use `putIfAbsent` for `tablesToRefresh` to avoid overwriting existing refresh requests. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java** * Added new class `UpgradeTableSchema` to encapsulate schema upgrade logic. * Implemented `newErrorCollector` to create an `ErrorCollector` instance. * Provided `getIncrementalSchema` to construct a new schema based on collected errors and an old schema. * Implemented `mergeSchemas` and `mergeFields` for combining BigQuery table schemas. * Added `isPayloadSchemaOutOfDate` to check for schema mismatches between payload and current table schema. * Included `hasUnknownFields` to recursively check for unknown fields in a protobuf message. * **sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java** * Added `getStorageException` helper for creating Storage API exceptions. * Modified `appendRows` to return `Exceptions.StorageException` instead of throwing `RuntimeException`. * Implemented `patchTableSchema` to simulate BigQuery schema updates. * Added `checkSchemaChanges` and `checkSchemaChangesProtos` to validate schema evolution rules. * Removed check for unknown fields in `appendRows` to align with new schema evolution behavior. * **sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java** * Added `testAutoPatchTableSchemaTest` to verify automatic schema patching functionality. * **sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java** * Updated calls to `messageFromTableRow` to include the new `ErrorCollector` parameter. </details> <details> <summary><b>Activity</b></summary> * reuvenlax requested a Gemini review. * gemini-code-assist[bot] reported an error creating the summary. * reuvenlax requested a Gemini summary again. </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
