TheNeuralBit commented on code in PR #23909:
URL: https://github.com/apache/beam/pull/23909#discussion_r1012230244
##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the
Dataflow Runner v2 backen
### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
For additional tips and troubleshooting information, see
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+ # Type
+ def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+ yield batch * 2
+
+ # Declare what the element-wise output type is
+ def infer_output_type(self, input_element_type):
+ return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+ | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+ | beam.Map(lambda x: x/3)) # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+ | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+ | beam.ParDo(MultiplyByTwo) # Batches passed through
+ | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a
+pipeline, Beam will inspect these typehints to ensure that the input and output
+types are compatible, and to verify that it understands how to create instances
+of this type of batch (see [Supported Batch Types](#batched-dofn-types)).
+{{< /paragraph >}}
+
+
+### 14.2 Element-wise Fallback {#batched-dofn-elementwise}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+For some DoFns you may be able to provide both a batched and an element-wise
+implementation of your desired logic. You can do this, by simply defining both
+`process` and `process_batch`:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+ def process(self, element: np.int64) -> Iterator[np.int64]:
+ # Multiply an individual int64 by 2
+ yield batch * 2
+
+ def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+ # Multiply a _batch_ of int64s by 2
+ yield batch * 2
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+When executing this DoFn, Beam will select the best implementation to use given
+the context. Generally, if the inputs to a DoFn are already batched Beam will
+use the batched implementation, otherwise it will use the element-wise
+implementation defined in the `process` method.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Note that, in this case, there is no need to define `infer_output_type`. This
is
+because Beam can get the output type from the typehint on `process`.
+{{< /paragraph >}}
+
+
+
+### 14.3 Batch Production vs. Batch Consumption
{#batched-dofn-batch-production}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+By convention, Beam assumes that the `process_batch` method, which consumes
+batched inputs, will also produce batched outputs. Similarly, Beam assumes the
+`process` method will produce individual elements. This can be overridden with
+the
[`@beam.DoFn.yields_elements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_elements)
and
+[`@beam.DoFn.yields_batches`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_batches)
decorators. For example:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+# Consumes elements, produces batches
+class ReadFromFile(beam.DoFn):
+
+ @beam.DoFn.yields_batches
+ def process(self, path: str) -> Iterator[np.ndarray]:
+ ...
+ yield array
+
+
+ # Declare what the element-wise output type is
+ def infer_output_type(self):
+ return np.int64
+
+# Consumes batches, produces elements
+class WriteToFile(beam.DoFn):
+ @beam.DoFn.yields_elements
+ def process(self, np.ndarray) -> Iterator[str]:
+ ...
+ yield output_path
+{{< /highlight >}}
+
+### 14.4 Supported Batch Types {#batched-dofn-types}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+So far we’ve used numpy types in these Batched DoFn implementations -
Review Comment:
I copied the discussion of numpy typehints up to the paragraph where we
direct users to this section. Do you think I should also remove it here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]