TheNeuralBit commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r847770714
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
Review Comment:
Yes, thanks. I noticed this one while working on the worker changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]