robertwb commented on code in PR #27145:
URL: https://github.com/apache/beam/pull/27145#discussion_r1240030006


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2244,6 +2250,82 @@ def process(self, *args, **kwargs):
                   traceback.format_exception(*sys.exc_info()))))
 
 
+# Idea adapted from Asguard.
+# TODO(robertwb): Consider how this could fit into the public API.
+# TODO(robertwb): Generalize to all PValue types.
+class _PValueWithErrors(object):
+  """This wraps a PCollection such that transforms can be chained in a linear
+  manner while still accumulating any errors."""
+  def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
+    self._pcoll = pcoll
+    self._exception_handling_args = exception_handling_args
+    self._upstream_errors = upstream_errors
+
+  def main_output_tag(self):
+    return self._exception_handling_args.get('main_tag', 'good')
+
+  def error_output_tag(self):
+    return self._exception_handling_args.get('dead_letter_tag', 'bad')
+
+  def __or__(self, transform):
+    return self.apply(transform)
+
+  def apply(self, transform):
+    result = self._pcoll | transform.with_exception_handling(
+        **self._exception_handling_args)
+    from apache_beam.typehints import typehints
+    if result[self.main_output_tag()].element_type == typehints.Any:
+      result[self.main_output_tag()].element_type = 
transform.infer_output_type(
+          self._pcoll.element_type)
+    # TODO(BEAM-18957): Add support for tagged type hints.
+    result[self.error_output_tag()].element_type = typehints.Any
+    return _PValueWithErrors(
+        result[self.main_output_tag()],
+        self._exception_handling_args,
+        self._upstream_errors + (result[self.error_output_tag()], ))
+
+  def accumulated_errors(self):
+    if len(self._upstream_errors) == 1:
+      return self._upstream_errors[0]
+    else:
+      return self._upstream_errors | Flatten()

Review Comment:
   Maybe, but that might have issues with streaming pipeline upgrades. Keeping 
as is for now as not all runners handle one-element flattens optimally. 



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2244,6 +2250,82 @@ def process(self, *args, **kwargs):
                   traceback.format_exception(*sys.exc_info()))))
 
 
+# Idea adapted from Asguard.

Review Comment:
   Yes. Fixed (added reference). 



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2182,6 +2185,9 @@ def expand(self, pcoll):
         *self._args,
         **self._kwargs).with_outputs(
             self._dead_letter_tag, main=self._main_tag, 
allow_unknown_tags=True)
+    #TODO(BEAM-18957): Fix when type inference supports tagged outputs.
+    result[self._main_tag].element_type = self._original_infer_element_type(
+        pcoll.element_type)

Review Comment:
   Yes, we could do that. `infer_output_type` is overridden by WindowInto, but 
an implementation is easily enough added to the implementing DoFn itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to