[ 
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=761131&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761131
 ]

ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Apr/22 21:59
            Start Date: 22/Apr/22 21:59
    Worklog Time Spent: 10m 
      Work Description: apilloud commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r856597444


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -685,6 +690,53 @@ def infer_output_type(self, input_type):
         self._strip_output_annotations(
             trivial_inference.infer_return_type(self.process, [input_type])))
 
+  @property
+  def process_defined(self) -> bool:
+    return (
+        self.process.__func__  # type: ignore
+        if hasattr(self.process, '__self__') else self.process) != DoFn.process
+
+  @property
+  def process_batch_defined(self) -> bool:
+    return (
+        self.process_batch.__func__  # type: ignore
+        if hasattr(self.process_batch, '__self__')
+        else self.process_batch) != DoFn.process_batch
+
+  def get_input_batch_type(self) -> typing.Optional[TypeConstraint]:
+    if not self.process_batch_defined:
+      return None
+    input_type = list(
+        
inspect.signature(self.process_batch).parameters.values())[0].annotation
+    if input_type == inspect.Signature.empty:
+      # TODO(BEAM-14340): Consider supporting an alternative (dynamic?) 
approach
+      # for declaring input type
+      raise TypeError(
+          f"{self.__class__.__name__}.process_batch() does not have a type "
+          "annotation on it's first parameter. This is reqired for "

Review Comment:
   s/reqired/required/?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 761131)
    Time Spent: 3h  (was: 2h 50m)

> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
>                 Key: BEAM-14213
>                 URL: https://issues.apache.org/jira/browse/BEAM-14213
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python 
> SDK.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to