damccorm commented on code in PR #35952: URL: https://github.com/apache/beam/pull/35952#discussion_r2330425269
########## sdks/python/apache_beam/yaml/yaml_transform.py: ########## @@ -522,6 +549,242 @@ def expand_leaf_transform(spec, scope): f'{type(outputs)}') +def expand_output_schema_transform(spec, outputs, error_handling_spec): + """Applies a `Validate` transform to the output of another transform. + + This function is called when an `output_schema` is defined on a transform. + It wraps the original transform's output(s) with a `Validate` transform + to ensure the data conforms to the specified schema. + + If the original transform has error handling configured, validation errors + will be routed to the specified error output. If not, validation failures + will cause the pipeline to fail. + + Args: + spec (dict): The `output_schema` specification from the YAML config. + outputs (beam.PCollection or dict[str, beam.PCollection]): The output(s) + from the transform to be validated. + error_handling_spec (dict): The `error_handling` configuration from the + original transform. + + Returns: + The validated PCollection(s). If error handling is enabled, this will be a + dictionary containing the 'good' output and any error outputs. + + Raises: + ValueError: If `error_handling` is incorrectly specified within the + `output_schema` spec itself, or if the main output of a multi-output + transform cannot be determined. + """ + if 'error_handling' in spec: + raise ValueError( + 'error_handling config is not supported directly in ' + 'the output_schema. Please use error_handling config in ' + 'the transform, if possible, or use ValidateWithSchema transform ' + 'instead.') + + # Strip metadata such as __line__ and __uuid__ as these will interfere with + # the validation downstream. + clean_schema = SafeLineLoader.strip_metadata(spec) + + # If no error handling is specified for the main transform, warn the user + # that the pipeline may fail if any output data fails the output schema + # validation. + if not error_handling_spec: + _LOGGER.warning("Output_schema config is attached to a transform that has "\ + "no error_handling config specified. Any failures validating on output" \ + "schema will fail the pipeline unless the user specifies an" \ + "error_handling config on a capable transform or the user can remove the" \ + "output_schema config on this transform and add a ValidateWithSchema " \ + "transform downstream of the current transform.") + + # The transform produced outputs with a single beam.PCollection + if isinstance(outputs, beam.PCollection): + outputs = _enforce_schema( + outputs, 'EnforceOutputSchema', error_handling_spec, clean_schema) + if isinstance(outputs, dict): + main_tag = error_handling_spec.get('main_tag', 'good') + main_output = outputs.pop(main_tag) + if error_handling_spec: + error_output_tag = error_handling_spec.get('output') + if error_output_tag in outputs: + return { + 'output': main_output, + error_output_tag: outputs.pop(error_output_tag) + } + return main_output + + # The transform produced outputs with many named PCollections and need to + # determine which PCollection should be validated on. + elif isinstance(outputs, dict): + main_output_key = _get_main_output_key(spec, outputs) Review Comment: > There is a doc string for the get_main_output_key method. Is that sufficient? This won't really be user visible (I wouldn't expect someone writing yaml pipelines to see this), so I think it would be better to include in yaml-schema.md > There is a warning in that get_main_output_key method. Is that sufficient? I don't see a warning, but maybe I'm looking in the wrong place. I see a ValueError if there is no main output, but specifically I'm interested in warning if the following is true: 1. There is a main output (which will get validated) 2. There is a second non-error-handling output (which will not get validated) Is that case already covered? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org