TheNeuralBit commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r849980799
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -685,6 +690,48 @@ def infer_output_type(self, input_type):
self._strip_output_annotations(
trivial_inference.infer_return_type(self.process, [input_type])))
+ @property
+ def process_defined(self) -> bool:
+ return (
+ self.process.__func__ # type: ignore
+ if hasattr(self.process, '__self__') else self.process) != DoFn.process
+
+ @property
+ def process_batch_defined(self) -> bool:
+ return (
+ self.process_batch.__func__ # type: ignore
+ if hasattr(self.process_batch, '__self__')
+ else self.process_batch) != DoFn.process_batch
+
+ def get_input_batch_type(self) -> typing.Optional[TypeConstraint]:
+ if not self.process_batch_defined:
+ return None
+ input_type = list(
+
inspect.signature(self.process_batch).parameters.values())[0].annotation
+ if input_type == inspect.Signature.empty:
+ raise TypeError("TODO")
Review Comment:
Note to self: please fix and add a test checking this
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
+ BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+ return batching_util_fn
+
+ @staticmethod
+ def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+ for constructor in BATCH_CONVERTER_REGISTRY:
+ result = constructor(element_type, batch_type)
+ if result is not None:
+ return result
+
+ # TODO: Include explanations for realistic candidate BatchConverter
+ raise TypeError(
+ f"Unable to find BatchConverter for element_type {element_type!r} and "
+ f"batch_type {batch_type!r}")
+
+ @property
+ def batch_type(self):
+ return self._batch_type
+
+ @property
+ def element_type(self):
+ return self._element_type
+
+
+N = "ARBITRARY LENGTH DIMENSION"
+
+
+class NumpyBatchConverter(BatchConverter):
Review Comment:
Good point, done.
##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+
+class BatchDoFn(beam.DoFn):
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+ def process_batch(self, batch: List[int], *args, **kwargs):
+ yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+ dofn: beam.DoFn
+ process_defined: bool
Review Comment:
These are meant to be expected values for the below parameterized test.
Using a NamedTuple here was a bad idea, I switched the logic to just use
dictionaries as they intended.
I might be missing your point though
##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+
+class BatchDoFn(beam.DoFn):
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+ def process_batch(self, batch: List[int], *args, **kwargs):
+ yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+ yield element
+
+ def process_batch(self, batch: List[int], *args,
+ **kwargs) -> Iterator[List[int]]:
+ yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+ dofn: beam.DoFn
+ process_defined: bool
+ process_batch_defined: bool
+ input_batch_type: Optional[type]
+ output_batch_type: Optional[type]
+
+
+def make_map_pardo():
+ return beam.Map(lambda x: x * 2).dofn
+
+
+@parameterized_class(
+ BatchDoFnTestCase.__annotations__.keys(),
+ [
+ BatchDoFnTestCase(
+ dofn=ElementDoFn(),
+ process_defined=True,
+ process_batch_defined=False,
+ input_batch_type=None,
+ output_batch_type=None),
+ BatchDoFnTestCase(
+ dofn=BatchDoFn(),
+ process_defined=False,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ BatchDoFnTestCase(
+ dofn=BatchDoFnNoReturnAnnotation(),
+ process_defined=False,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ BatchDoFnTestCase(
+ dofn=EitherDoFn(),
+ process_defined=True,
+ process_batch_defined=True,
+ input_batch_type=beam.typehints.List[int],
+ output_batch_type=beam.typehints.List[int]),
+ #BatchDoFnTestCase(
+ # dofn=make_map_pardo().dofn,
+ # process_defined=True,
+ # process_batch_defined=False,
+ # input_batch_type=None,
+ # batch_output_type=None),
+ ],
+ class_name_func=lambda _,
Review Comment:
I broke this out as a free function
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
Review Comment:
Added typehint
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
def infer_output_type(self, input_type):
return self.fn.infer_output_type(input_type)
+ def infer_batch_converters(self, input_element_type):
+ # This assumes batch input implies batch output
+ # TODO: Define and handle yields_batches and yields_elements
+ if self.fn.process_batch_defined:
+ input_batch_type = self.fn.get_input_batch_type()
+
+ if input_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "an input type annoation")
+
+ output_batch_type = self.fn.get_output_batch_type()
+ if output_batch_type is None:
+ raise TypeError(
+ "process_batch method on {self.fn!r} does not have "
+ "a return type annoation")
+
+ # Generate a batch converter to convert between the input type and the
+ # (batch) input type of process_batch
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=input_element_type, batch_type=input_batch_type)
+
+ # Generate a batch converter to convert between the output type and the
+ # (batch) output type of process_batch
+ output_element_type = self.infer_output_type(input_element_type)
+ self.fn.input_batch_converter = BatchConverter.from_typehints(
+ element_type=output_element_type, batch_type=output_batch_type)
+
+ def infer_output_batch_type(self):
+ # TODO: Handle process() with @yields_batch
Review Comment:
Done - BEAM-14293
##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+ def __init__(self, batch_type, element_type):
+ self._batch_type = batch_type
+ self._element_type = element_type
+
+ def produce_batch(self, elements: Sequence[E]) -> B:
+ """Convert an instance of List[E] to a single instance of B."""
+ raise NotImplementedError
+
+ def explode_batch(self, batch: B) -> Iterator[E]:
+ """Convert an instance of B to Iterator[E]."""
+ raise NotImplementedError
+
+ def combine_batches(self, batches: Sequence[B]) -> B:
+ raise NotImplementedError
+
+ def get_length(self, batch: B) -> int:
+ raise NotImplementedError
+
+ @staticmethod
+ def register(batching_util_fn):
+ BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+ return batching_util_fn
+
+ @staticmethod
+ def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+ for constructor in BATCH_CONVERTER_REGISTRY:
+ result = constructor(element_type, batch_type)
Review Comment:
I assume you're thinking we would catch the exceptions and potentially
aggregate their messages to pass along to the user?
That could make sense, but I worry it will get very noisy as the number of
implementations grows. We might define a "near miss" concept to filter out just
the messages that are potentially relevant. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]