damccorm commented on code in PR #33094:
URL: https://github.com/apache/beam/pull/33094#discussion_r1840909138
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -482,6 +483,65 @@ def expand(pcoll, error_handling=None, **kwargs):
return expand
+class _StripErrorMetadata(beam.PTransform):
+ """Strips error metadata from outputs returned via error handling.
+
+ Generally the error outputs for transformations return information about
+ the error encountered (e.g. error messages and tracebacks) in addition to the
+ failing element itself. This transformation attempts to remove that metadata
+ and returns the bad element alone which can be useful for re-processing.
+
+ For example, in the following pipeline snippet::
+
+ - name: MyMappingTransform
+ type: MapToFields
+ input: SomeInput
+ config:
+ language: python
+ fields:
+ ...
+ error_handling:
+ output: errors
+
+ - name: RecoverOriginalElements
+ type: StripErrorMetadata
+ input: MyMappingTransform.errors
+
+ the output of `RecoverOriginalElements` will contain exactly those elements
+ from SomeInput that failed to processes (whereas `MyMappingTransform.errors`
+ would contain those elements paired with error information).
+
+ Note that this relies on the preceding transform actually returning the
+ failing input in a schema'd way. Most built-in transformation follow the
+ correct conventions.
+ """
+
+ _ERROR_FIELD_NAMES = ('failed_row', 'element', 'record')
+
+ def expand(self, pcoll):
+ try:
+ existing_fields = {
+ fld.name: fld.type
+ for fld in schema_from_element_type(pcoll.element_type).fields
+ }
+ except TypeError:
+ fld = None
+ else:
+ for fld in self._ERROR_FIELD_NAMES:
+ if fld in existing_fields:
+ break
+ else:
+ raise ValueError(
+ f"No field name matches one of {self._ERROR_FIELD_NAMES}")
Review Comment:
I think we should expand on this error message and explain what it means
(something like `The previous transform does not support the
`StripErrorMetadata` transform, and must output error records in XYZ format`).
I'm not going to block submission of the PR on this, since I think it is
valuable to make the cut.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]