gemini-code-assist[bot] commented on code in PR #35952:
URL: https://github.com/apache/beam/pull/35952#discussion_r2322407191
##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -522,6 +548,241 @@ def expand_leaf_transform(spec, scope):
f'{type(outputs)}')
+def expand_output_schema_transform(spec, outputs, error_handling_spec):
+ """Applies a `Validate` transform to the output of another transform.
+
+ This function is called when an `output_schema` is defined on a transform.
+ It wraps the original transform's output(s) with a `Validate` transform
+ to ensure the data conforms to the specified schema.
+
+ If the original transform has error handling configured, validation errors
+ will be routed to the specified error output. If not, validation failures
+ will cause the pipeline to fail.
+
+ Args:
+ spec (dict): The `output_schema` specification from the YAML config.
+ outputs (beam.PCollection or dict[str, beam.PCollection]): The output(s)
+ from the transform to be validated.
+ error_handling_spec (dict): The `error_handling` configuration from the
+ original transform.
+
+ Returns:
+ The validated PCollection(s). If error handling is enabled, this will be a
+ dictionary containing the 'good' output and any error outputs.
+
+ Raises:
+ ValueError: If `error_handling` is incorrectly specified within the
+ `output_schema` spec itself, or if the main output of a multi-output
+ transform cannot be determined.
+ """
+ if 'error_handling' in spec:
+ raise ValueError(
+ 'error_handling config is not supported directly in '
+ 'the output_schema. Please use error_handling config in '
+ 'the transform.')
+
+ # Strip metadata such as __line__ and __uuid__ as these will interfere with
+ # the validation downstream.
+ clean_schema = SafeLineLoader.strip_metadata(spec)
+
+ # If no error handling is specified for the main transform, warn the user
+ # that the pipeline may fail if any output data fails the output schema
+ # validation.
+ if not error_handling_spec:
+ _LOGGER.warning("Output_schema config is attached to a transform that has
"\
+ "no error_handling config specified. Any failures validating on output" \
+ "schema will fail the pipeline unless the user specifies an" \
+ "error_handling config on a capable transform or the user can remove the" \
+ "output_schema config on this transform and add a ValidateWithSchema " \
+ "transform downstream of the current transform.")
Review Comment:

There's a small typo in the warning message. `outputschema` should be
`output schema`. Correcting this will improve clarity for users who encounter
this warning.
```python
_LOGGER.warning(
"Output_schema config is attached to a transform that has "
"no error_handling config specified. Any failures validating on output "
"schema will fail the pipeline unless the user specifies an "
"error_handling config on a capable transform or the user can remove the
"
"output_schema config on this transform and add a ValidateWithSchema "
"transform downstream of the current transform.")
```
##########
website/www/site/content/en/documentation/sdks/yaml-schema.md:
##########
@@ -0,0 +1,119 @@
+---
+type: languages
+title: "Apache Beam YAML Schema"
+---
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Beam YAML Schema
+
+As pipelines grow in size and complexity, it becomes more common to encounter
+data that is malformed, doesn't meet preconditions, or otherwise causes issues
+during processing.
+
+Beam YAML helps the user detect and capture these issues by using the optional
+`output_schema` configuration, which is available for any transform in the YAML
+SDK. For example, the following code creates a few "good" records and specifies
+that the output schema from the `Create` transform should have records that
+follow the expected schema: `sdk` as a string and `year` as an integer.
+
+```yaml
+pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {sdk: MapReduce, year: 2004}
+ - {sdk: MillWheel, year: 2008}
+ output_schema:
+ type: object
+ properties:
+ sdk:
+ type: string
+ year:
+ type: integer
+ - type: AssertEqual
+ config:
+ elements:
+ - {sdk: MapReduce, year: 2004}
+ - {sdk: MillWheel, year: 2008}
+```
+
+However, a user will more likely want to detect and handle schema errors. If a
+transform has a built-in error_handling configuration, the user can specify
that
+error_handling configuration and any errors found will be appended to the
+transform error_handling output. For example, the following code will
+create a few "good" and "bad" records with a specified schema of `user` as a
+string and `timestamp` as a boolean. The `alice` row will fail in the standard
+way because of not being an integer for the AssignTimestamps transform, while
+the `bob` row will fail because after the AssignTimestamp transformation, the
+output row will have the timestamp as an integer when it should be a boolean.
+
+
+```yaml
+pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateVisits
+ config:
+ elements:
+ - {user: alice, timestamp: "not-valid"}
+ - {user: bob, timestamp: 3}
+ - type: AssignTimestamps
+ input: CreateVisits
+ config:
+ timestamp: timestamp
+ error_handling:
+ output: invalid_rows
+ output_schema:
+ type: object
+ properties:
+ user:
+ type: string
+ timestamp:
+ type: boolean
+ - type: MapToFields
+ name: ExtractInvalidTimestamp
+ input: AssignTimestamps.invalid_rows
+ config:
+ language: python
+ fields:
+ user: "element.user"
+ timestamp: "element.timestamp"
+ - type: AssertEqual
+ input: ExtractInvalidTimestamp
+ config:
+ elements:
+ - {user: "alice", timestamp: "not-valid"}
+ - {user: bob, timestamp: 3}
+ - type: AssertEqual
+ input: AssignTimestamps
+ config:
+ elements: []
+```
+
+WARNING: If a transform doesn't have the error_handling configuration available
+and a user chooses to use this optional output_schema feature, any failures
+found will result in the entire pipeline failing. If the user would still like
+to have somekind of output schema validation, please use the ValidateWithSchema
+transform instead.
Review Comment:

There's a typo in the documentation. `somekind` should be `some kind`.
```suggestion
to have some kind of output schema validation, please use the
ValidateWithSchema
transform instead.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]