[
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=755458&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755458
]
ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Apr/22 22:03
Start Date: 11/Apr/22 22:03
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r847770714
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
Review Comment:
Yes, thanks. I noticed this one while working on the worker changes.
Issue Time Tracking
-------------------
Worklog Id: (was: 755458)
Time Spent: 1.5h (was: 1h 20m)
> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
> Key: BEAM-14213
> URL: https://issues.apache.org/jira/browse/BEAM-14213
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python
> SDK.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)