bzablocki commented on code in PR #27145:
URL: https://github.com/apache/beam/pull/27145#discussion_r1238317400


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2182,6 +2185,9 @@ def expand(self, pcoll):
         *self._args,
         **self._kwargs).with_outputs(
             self._dead_letter_tag, main=self._main_tag, 
allow_unknown_tags=True)
+    #TODO(BEAM-18957): Fix when type inference supports tagged outputs.
+    result[self._main_tag].element_type = self._original_infer_element_type(
+        pcoll.element_type)

Review Comment:
   Wouldn't it be better to avoid adding an additional 
`_original_infer_element_type` parameter to the `_ExceptionHandlingWrapper` 
constructor, if it's only called from within this function?
   Then we could have here 
   ```suggestion
       result[self._main_tag].element_type = self._fn.infer_output_type(
           pcoll.element_type)
   ```
   Advantages of my suggestion: 
   - we would avoid adding another param to the already long list of parameters 
and 
   - we wouldn't mix simple type arguments with function references (although 
we could clarify that with type hints)
   
   Disadvantage: 
   - if we create the `_ExceptionHandlingWrapper` with an `fn` that does not 
implement the `infer_output_type()` function, we'd need to go back to this 
implementation. But should we optimize for the future?
   
   What's your opinion on that? Do you see other benefits of your solution? 
   
   



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2244,6 +2250,82 @@ def process(self, *args, **kwargs):
                   traceback.format_exception(*sys.exc_info()))))
 
 
+# Idea adapted from Asguard.

Review Comment:
   Did you mean "asgarde" (https://github.com/tosun-si/asgarde)?



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2244,6 +2250,82 @@ def process(self, *args, **kwargs):
                   traceback.format_exception(*sys.exc_info()))))
 
 
+# Idea adapted from Asguard.
+# TODO(robertwb): Consider how this could fit into the public API.
+# TODO(robertwb): Generalize to all PValue types.
+class _PValueWithErrors(object):
+  """This wraps a PCollection such that transforms can be chained in a linear
+  manner while still accumulating any errors."""
+  def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
+    self._pcoll = pcoll
+    self._exception_handling_args = exception_handling_args
+    self._upstream_errors = upstream_errors
+
+  def main_output_tag(self):
+    return self._exception_handling_args.get('main_tag', 'good')
+
+  def error_output_tag(self):
+    return self._exception_handling_args.get('dead_letter_tag', 'bad')
+
+  def __or__(self, transform):
+    return self.apply(transform)
+
+  def apply(self, transform):
+    result = self._pcoll | transform.with_exception_handling(
+        **self._exception_handling_args)
+    from apache_beam.typehints import typehints
+    if result[self.main_output_tag()].element_type == typehints.Any:
+      result[self.main_output_tag()].element_type = 
transform.infer_output_type(
+          self._pcoll.element_type)
+    # TODO(BEAM-18957): Add support for tagged type hints.
+    result[self.error_output_tag()].element_type = typehints.Any
+    return _PValueWithErrors(
+        result[self.main_output_tag()],
+        self._exception_handling_args,
+        self._upstream_errors + (result[self.error_output_tag()], ))
+
+  def accumulated_errors(self):
+    if len(self._upstream_errors) == 1:
+      return self._upstream_errors[0]
+    else:
+      return self._upstream_errors | Flatten()

Review Comment:
   Why do we need to separately handle a case when `self._upstream_errors` 
contains one element? Shouldn't `Flatten()` handle that? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to