[
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=754855&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754855
]
ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Apr/22 23:13
Start Date: 08/Apr/22 23:13
Worklog Time Spent: 10m
Work Description: robertwb commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r846527918
##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+
+class BatchDoFn(beam.DoFn):
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+ def process_batch(self, batch: List[int], *args, **kwargs):
+ yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+ dofn: beam.DoFn
+ process_defined: bool
Review Comment:
Aren't these both functions of dofn? Can we simply infer it?
##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,14 @@ cdef class WindowedValue(object):
cpdef WindowedValue with_value(self, new_value)
[email protected]
Review Comment:
In particular, for non-1:1 batch processing, we may want to (optionally?)
expose the windowing information as part of the batch itself, e.g. as
additional columns (for a dataframe-like batch), or as a list of WindowedValues
(for a List batch), or ???
An alternative is to have batches of homogeneous windowing information
(which would allow dropping the 1:1 requirement, at the expense of possibly
smaller batches).
##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+
+class BatchDoFn(beam.DoFn):
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
Review Comment:
I think we should get feedback from the list whether this should return an
iterable of batches, or a single batch. There are pros and cons to each.
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
Review Comment:
self.fn.output_batch_converter?
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
Review Comment:
I know it's likely messy, but this parameter would benefit from type hints
(and maybe a docstring).
##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+
+class BatchDoFn(beam.DoFn):
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+ def process_batch(self, batch: List[int], *args, **kwargs):
+ yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+ dofn: beam.DoFn
+ process_defined: bool
+ process_batch_defined: bool
+ input_batch_type: Optional[type]
+ output_batch_type: Optional[type]
+
+
+def make_map_pardo():
+ return beam.Map(lambda x: x * 2).dofn
+
+
+@parameterized_class(
+ BatchDoFnTestCase.__annotations__.keys(),
+ [
+ BatchDoFnTestCase(
+ dofn=ElementDoFn(),
+ process_defined=True,
+ process_batch_defined=False,
+ input_batch_type=None,
+ output_batch_type=None),
+ BatchDoFnTestCase(
+ dofn=BatchDoFn(),
+ process_defined=False,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ BatchDoFnTestCase(
+ dofn=BatchDoFnNoReturnAnnotation(),
+ process_defined=False,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ BatchDoFnTestCase(
+ dofn=EitherDoFn(),
+ process_defined=True,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ #BatchDoFnTestCase(
+ # dofn=make_map_pardo().dofn,
+ # process_defined=True,
+ # process_batch_defined=False,
+ # input_batch_type=None,
+ # batch_output_type=None),
+ ],
+ class_name_func=lambda _,
Review Comment:
This is hard to read, could some ()'s make yapf happier?
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=output_element_type, batch_type=output_batch_type)
+
+ def infer_output_batch_type(self):
+ # TODO: Handle process() with @yields_batch
Review Comment:
Let's file a JIRA for these TODOs related to `@yields_batch`.
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
+ BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+ return batching_util_fn
+
+ @staticmethod
+ def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+ for constructor in BATCH_CONVERTER_REGISTRY:
+ result = constructor(element_type, batch_type)
+ if result is not None:
+ return result
+
+ # TODO: Include explanations for realistic candidate BatchConverter
+ raise TypeError(
+ f"Unable to find BatchConverter for element_type {element_type!r} and "
+ f"batch_type {batch_type!r}")
+
+ @property
+ def batch_type(self):
+ return self._batch_type
+
+ @property
+ def element_type(self):
+ return self._element_type
+
+
+N = "ARBITRARY LENGTH DIMENSION"
+
+
+class NumpyBatchConverter(BatchConverter):
Review Comment:
ListBatchConverter would be a useful one to have (both in practice, and as a
canonical example).
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
+ BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+ return batching_util_fn
+
+ @staticmethod
+ def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+ for constructor in BATCH_CONVERTER_REGISTRY:
+ result = constructor(element_type, batch_type)
Review Comment:
Should the API be to throw (possibly with an explanatory message) rather
than return None?
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=output_element_type, batch_type=output_batch_type)
+
+ def infer_output_batch_type(self):
+ # TODO: Handle process() with @yields_batch
+ if not self.fn.process_batch_defined:
Review Comment:
I wonder if here (and above) we could just let
`self.fn.get_{input,output}_batch_type` handle checking for
`process_batch_defined` and a future `yields_batch` rather than scattering this
logic.
##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,14 @@ cdef class WindowedValue(object):
cpdef WindowedValue with_value(self, new_value)
[email protected]
Review Comment:
Let's make this an interface. (Maybe split it out into a different PR as
well.)
Issue Time Tracking
-------------------
Worklog Id: (was: 754855)
Time Spent: 1h 20m (was: 1h 10m)
> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
> Key: BEAM-14213
> URL: https://issues.apache.org/jira/browse/BEAM-14213
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python
> SDK.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)