yeandy commented on a change in pull request #16351:
URL: https://github.com/apache/beam/pull/16351#discussion_r810376147



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -3275,3 +3286,26 @@ def to_runner_api_parameter(self, unused_context):
   def from_runner_api_parameter(
       unused_ptransform, unused_parameter, unused_context):
     return Impulse()
+
+
+def _strip_output_annotations(type_hint):
+  # TODO(robertwb): These should be parameterized types that the
+  # type inferencer understands.
+  # Then we can replace them with the correct element types instead of
+  # using Any. Refer to typehints.WindowedValue when doing this.
+  annotations = (TimestampedValue, WindowedValue, pvalue.TaggedOutput)
+
+  contains_annotation = False
+
+  def visitor(t, unused_args):
+    if t in annotations:
+      raise StopIteration
+
+  try:
+    visit_inner_types(type_hint, visitor, [])
+  except StopIteration:
+    contains_annotation = True
+
+  if contains_annotation:
+    return typehints.Any
+  return type_hint

Review comment:
       ```suggestion
   return typehints.Any if contains_annotation else type_hint
   ```

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1714,14 +1713,20 @@ def MapTuple(fn, *args, **kwargs):  # pylint: 
disable=invalid-name
 
   # Proxy the type-hint information from the original function to this new
   # wrapped function.
-  type_hints = get_type_hints(fn)
+  type_hints = get_type_hints(fn).with_defaults(
+      typehints.decorators.IOTypeHints.from_callable(fn))
   if type_hints.input_types is not None:
-    wrapper = with_input_types(
-        *type_hints.input_types[0], **type_hints.input_types[1])(
-            wrapper)
+    # FIXME(feyu): ignore input hints, as we do not have enough

Review comment:
       Is this tied to a different BEAM issue as a future fix?

##########
File path: sdks/python/apache_beam/typehints/typehints.py
##########
@@ -175,6 +175,35 @@ def visit(self, visitor, visitor_arg):
         visitor(t, visitor_arg)
 
 
+def visit_inner_types(type_constraint, visitor, visitor_arg):
+  """Visitor pattern to visit all inner types of a type constraint.
+
+  Args:
+    type_constraint: A type constraint or a type.
+    visitor: A callable invoked for all nodes in the type tree comprising a
+      composite type. The visitor will be called with the node visited and the
+      visitor argument specified here.
+    visitor_arg: Visitor callback second argument.
+
+  Note:
+    Raise and capture a StopIteration to terminate the visit, e.g.
+
+    ```
+    def visitor(type_constraint, visitor_arg):
+      if ...:
+        raise StopIteration
+
+    try:
+      visit_inner_types(type_constraint, visitor, visitor_arg)
+    except StopIteration:
+      pass
+    ```
+  """
+  if isinstance(type_constraint, TypeConstraint):
+    return type_constraint.visit(visitor, visitor_arg)
+  return visitor(type_constraint, visitor_arg)

Review comment:
       This looks largely like the same logic in `TypeConstraint`'s `visit()` 
method. Would it make sense to refactor the logic into `TypeConstraint.visit()` 
?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to