[ 
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=755458&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755458
 ]

ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/22 22:03
            Start Date: 11/Apr/22 22:03
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r847770714


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
   def infer_output_type(self, input_type):
     return self.fn.infer_output_type(input_type)
 
+  def infer_batch_converters(self, input_element_type):
+    # This assumes batch input implies batch output
+    # TODO: Define and handle yields_batches and yields_elements
+    if self.fn.process_batch_defined:
+      input_batch_type = self.fn.get_input_batch_type()
+
+      if input_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "an input type annoation")
+
+      output_batch_type = self.fn.get_output_batch_type()
+      if output_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "a return type annoation")
+
+      # Generate a batch converter to convert between the input type and the
+      # (batch) input type of process_batch
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=input_element_type, batch_type=input_batch_type)
+
+      # Generate a batch converter to convert between the output type and the
+      # (batch) output type of process_batch
+      output_element_type = self.infer_output_type(input_element_type)
+      self.fn.input_batch_converter = BatchConverter.from_typehints(

Review Comment:
   Yes, thanks. I noticed this one while working on the worker changes.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755458)
    Time Spent: 1.5h  (was: 1h 20m)

> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
>                 Key: BEAM-14213
>                 URL: https://issues.apache.org/jira/browse/BEAM-14213
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python 
> SDK.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to