[
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=761131&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761131
]
ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Apr/22 21:59
Start Date: 22/Apr/22 21:59
Worklog Time Spent: 10m
Work Description: apilloud commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r856597444
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -685,6 +690,53 @@ def infer_output_type(self, input_type):
self._strip_output_annotations(
trivial_inference.infer_return_type(self.process, [input_type])))
+ @property
+ def process_defined(self) -> bool:
+ return (
+ self.process.__func__ # type: ignore
+ if hasattr(self.process, '__self__') else self.process) != DoFn.process
+
+ @property
+ def process_batch_defined(self) -> bool:
+ return (
+ self.process_batch.__func__ # type: ignore
+ if hasattr(self.process_batch, '__self__')
+ else self.process_batch) != DoFn.process_batch
+
+ def get_input_batch_type(self) -> typing.Optional[TypeConstraint]:
+ if not self.process_batch_defined:
+ return None
+ input_type = list(
+
inspect.signature(self.process_batch).parameters.values())[0].annotation
+ if input_type == inspect.Signature.empty:
+ # TODO(BEAM-14340): Consider supporting an alternative (dynamic?)
approach
+ # for declaring input type
+ raise TypeError(
+ f"{self.__class__.__name__}.process_batch() does not have a type "
+ "annotation on it's first parameter. This is reqired for "
Review Comment:
s/reqired/required/?
Issue Time Tracking
-------------------
Worklog Id: (was: 761131)
Time Spent: 3h (was: 2h 50m)
> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
> Key: BEAM-14213
> URL: https://issues.apache.org/jira/browse/BEAM-14213
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 3h
> Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python
> SDK.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)