[ 
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=756782&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756782
 ]

ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/22 00:07
            Start Date: 14/Apr/22 00:07
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r849980799


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -685,6 +690,48 @@ def infer_output_type(self, input_type):
         self._strip_output_annotations(
             trivial_inference.infer_return_type(self.process, [input_type])))
 
+  @property
+  def process_defined(self) -> bool:
+    return (
+        self.process.__func__  # type: ignore
+        if hasattr(self.process, '__self__') else self.process) != DoFn.process
+
+  @property
+  def process_batch_defined(self) -> bool:
+    return (
+        self.process_batch.__func__  # type: ignore
+        if hasattr(self.process_batch, '__self__')
+        else self.process_batch) != DoFn.process_batch
+
+  def get_input_batch_type(self) -> typing.Optional[TypeConstraint]:
+    if not self.process_batch_defined:
+      return None
+    input_type = list(
+        
inspect.signature(self.process_batch).parameters.values())[0].annotation
+    if input_type == inspect.Signature.empty:
+      raise TypeError("TODO")

Review Comment:
   Note to self: please fix and add a test checking this



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):
+    BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+    return batching_util_fn
+
+  @staticmethod
+  def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+    for constructor in BATCH_CONVERTER_REGISTRY:
+      result = constructor(element_type, batch_type)
+      if result is not None:
+        return result
+
+    # TODO: Include explanations for realistic candidate BatchConverter
+    raise TypeError(
+        f"Unable to find BatchConverter for element_type {element_type!r} and "
+        f"batch_type {batch_type!r}")
+
+  @property
+  def batch_type(self):
+    return self._batch_type
+
+  @property
+  def element_type(self):
+    return self._element_type
+
+
+N = "ARBITRARY LENGTH DIMENSION"
+
+
+class NumpyBatchConverter(BatchConverter):

Review Comment:
   Good point, done.



##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+
+class BatchDoFn(beam.DoFn):
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+  def process_batch(self, batch: List[int], *args, **kwargs):
+    yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+  dofn: beam.DoFn
+  process_defined: bool

Review Comment:
   These are meant to be expected values for the below parameterized test. 
Using a NamedTuple here was a bad idea, I switched the logic to just use 
dictionaries as they intended.
   
   I might be missing your point though



##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+
+class BatchDoFn(beam.DoFn):
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+  def process_batch(self, batch: List[int], *args, **kwargs):
+    yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+  dofn: beam.DoFn
+  process_defined: bool
+  process_batch_defined: bool
+  input_batch_type: Optional[type]
+  output_batch_type: Optional[type]
+
+
+def make_map_pardo():
+  return beam.Map(lambda x: x * 2).dofn
+
+
+@parameterized_class(
+    BatchDoFnTestCase.__annotations__.keys(),
+    [
+        BatchDoFnTestCase(
+            dofn=ElementDoFn(),
+            process_defined=True,
+            process_batch_defined=False,
+            input_batch_type=None,
+            output_batch_type=None),
+        BatchDoFnTestCase(
+            dofn=BatchDoFn(),
+            process_defined=False,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        BatchDoFnTestCase(
+            dofn=BatchDoFnNoReturnAnnotation(),
+            process_defined=False,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        BatchDoFnTestCase(
+            dofn=EitherDoFn(),
+            process_defined=True,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        #BatchDoFnTestCase(
+        #    dofn=make_map_pardo().dofn,
+        #    process_defined=True,
+        #    process_batch_defined=False,
+        #    input_batch_type=None,
+        #    batch_output_type=None),
+    ],
+    class_name_func=lambda _,

Review Comment:
   I broke this out as a free function



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):

Review Comment:
   Added typehint



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
   def infer_output_type(self, input_type):
     return self.fn.infer_output_type(input_type)
 
+  def infer_batch_converters(self, input_element_type):
+    # This assumes batch input implies batch output
+    # TODO: Define and handle yields_batches and yields_elements
+    if self.fn.process_batch_defined:
+      input_batch_type = self.fn.get_input_batch_type()
+
+      if input_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "an input type annoation")
+
+      output_batch_type = self.fn.get_output_batch_type()
+      if output_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "a return type annoation")
+
+      # Generate a batch converter to convert between the input type and the
+      # (batch) input type of process_batch
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=input_element_type, batch_type=input_batch_type)
+
+      # Generate a batch converter to convert between the output type and the
+      # (batch) output type of process_batch
+      output_element_type = self.infer_output_type(input_element_type)
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=output_element_type, batch_type=output_batch_type)
+
+  def infer_output_batch_type(self):
+    # TODO: Handle process() with @yields_batch

Review Comment:
   Done - BEAM-14293



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):
+    BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+    return batching_util_fn
+
+  @staticmethod
+  def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+    for constructor in BATCH_CONVERTER_REGISTRY:
+      result = constructor(element_type, batch_type)

Review Comment:
   I assume you're thinking we would catch the exceptions and potentially 
aggregate their messages to pass along to the user?
   
   That could make sense, but I worry it will get very noisy as the number of 
implementations grows. We might define a "near miss" concept to filter out just 
the messages that are potentially relevant. What do you think?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756782)
    Time Spent: 2h 10m  (was: 2h)

> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
>                 Key: BEAM-14213
>                 URL: https://issues.apache.org/jira/browse/BEAM-14213
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python 
> SDK.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to