[ 
https://issues.apache.org/jira/browse/BEAM-14213?focusedWorklogId=758802&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758802
 ]

ASF GitHub Bot logged work on BEAM-14213:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/22 20:58
            Start Date: 19/Apr/22 20:58
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r853480454


##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):
+    BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+    return batching_util_fn
+
+  @staticmethod
+  def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+    for constructor in BATCH_CONVERTER_REGISTRY:
+      result = constructor(element_type, batch_type)

Review Comment:
   Yes. I would rather be too verbose than not verbose enough in the case of 
failure (e.g. a common error could be simply that a converter wasn't 
registered, so letting it say "doesn't fit" would still be valuable 
information). 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 758802)
    Time Spent: 2h 20m  (was: 2h 10m)

> Add support for Batched DoFns in the Python SDK
> -----------------------------------------------
>
>                 Key: BEAM-14213
>                 URL: https://issues.apache.org/jira/browse/BEAM-14213
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Add an implementation for https://s.apache.org/batched-dofns to the Python 
> SDK.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to