pcoet commented on code in PR #23909:
URL: https://github.com/apache/beam/pull/23909#discussion_r1011974873


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how

Review Comment:
   I _think_this is typically two words: `typehints` -> `type hints`. But it 
looks like the Beam module is `typehints`, and then we refer to them in the py 
docs as `type-hints`: 
https://beam.apache.org/releases/pydoc/current/apache_beam.typehints.html So 
that confuses things a little. I'd recommend two words, but not a big deal. 
Just calling this out once, but the word appears again below.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a

Review Comment:
   `ued` -> `used`



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a
+pipeline, Beam will inspect these typehints to ensure that the input and output
+types are compatible, and to verify that it understands how to create instances
+of this type of batch (see [Supported Batch Types](#batched-dofn-types)).
+{{< /paragraph >}}
+
+
+### 14.2 Element-wise Fallback {#batched-dofn-elementwise}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+For some DoFns you may be able to provide both a batched and an element-wise
+implementation of your desired logic. You can do this, by simply defining both
+`process` and `process_batch`:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    # Multiply an individual int64 by 2
+    yield batch * 2
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    # Multiply a _batch_ of int64s by 2
+    yield batch * 2
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+When executing this DoFn, Beam will select the best implementation to use given
+the context. Generally, if the inputs to a DoFn are already batched Beam will
+use the batched implementation, otherwise it will use the element-wise
+implementation defined in the `process` method.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Note that, in this case, there is no need to define `infer_output_type`. This 
is
+because Beam can get the output type from the typehint on `process`.
+{{< /paragraph >}}
+
+
+
+### 14.3 Batch Production vs. Batch Consumption 
{#batched-dofn-batch-production}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+By convention, Beam assumes that the `process_batch` method, which consumes
+batched inputs, will also produce batched outputs. Similarly, Beam assumes the
+`process` method will produce individual elements. This can be overridden with
+the 
[`@beam.DoFn.yields_elements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_elements)
 and
+[`@beam.DoFn.yields_batches`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_batches)
 decorators. For example:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+# Consumes elements, produces batches
+class ReadFromFile(beam.DoFn):
+
+  @beam.DoFn.yields_batches
+  def process(self, path: str) -> Iterator[np.ndarray]:
+    ...
+    yield array
+  
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self):
+    return np.int64
+
+# Consumes batches, produces elements
+class WriteToFile(beam.DoFn):
+  @beam.DoFn.yields_elements
+  def process(self, np.ndarray) -> Iterator[str]:
+    ...
+    yield output_path
+{{< /highlight >}}
+
+### 14.4 Supported Batch Types {#batched-dofn-types}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+So far we’ve used numpy types in these Batched DoFn implementations -
+`np.int64 ` as the element typehint and `np.ndarray` as the corresponding
+batch typehint - but Beam supports typehints from other libraries as well.
+{{< /paragraph >}}
+
+#### 
[numpy](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/batch.py)
+| Element Typehint | Batch Typehint |
+| ---------------- | -------------- |
+| Numeric types (`int`, `np.int32`, `bool`, ...) | np.ndarray (or NumpyArray) |
+
+#### 
[pandas](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/pandas_type_compatibility.py)
+| Element Typehint | Batch Typehint |
+| ---------------- | -------------- |
+| Numeric types (`int`, `np.int32`, `bool`, ...) | `pd.Series` |
+| `bytes` | |
+| `Any` | |
+| [Beam Schema Types](#schemas) | `pd.DataFrame` |
+
+#### 
[pyarrow](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/arrow_type_compatibility.py)
+Coming in Beam 2.44.0.

Review Comment:
   Let's only say this if we're _sure_ that this type hint will ship in 2.44. 
Otherwise, let's say in a future release or something...



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a
+pipeline, Beam will inspect these typehints to ensure that the input and output
+types are compatible, and to verify that it understands how to create instances
+of this type of batch (see [Supported Batch Types](#batched-dofn-types)).
+{{< /paragraph >}}
+
+
+### 14.2 Element-wise Fallback {#batched-dofn-elementwise}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+For some DoFns you may be able to provide both a batched and an element-wise
+implementation of your desired logic. You can do this, by simply defining both
+`process` and `process_batch`:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    # Multiply an individual int64 by 2
+    yield batch * 2
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    # Multiply a _batch_ of int64s by 2
+    yield batch * 2
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+When executing this DoFn, Beam will select the best implementation to use given
+the context. Generally, if the inputs to a DoFn are already batched Beam will
+use the batched implementation, otherwise it will use the element-wise

Review Comment:
   `implementation, otherwise` -> `implementation; otherwise`. I believe this 
is right. No one actually knows how to use semicolons. :-)



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that

Review Comment:
   `modular, composable, components` -> `modular, composable components`



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how

Review Comment:
   Would it be useful to add a sentence calling out the type hint in the code 
snippet(`.with_output_types(np.int64)`)? I thought that Python type hints were 
something else (https://docs.python.org/3/library/typing.html), and I had to 
read more to realize that the type hint is provided by this method and arg.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a
+pipeline, Beam will inspect these typehints to ensure that the input and output
+types are compatible, and to verify that it understands how to create instances
+of this type of batch (see [Supported Batch Types](#batched-dofn-types)).
+{{< /paragraph >}}
+
+
+### 14.2 Element-wise Fallback {#batched-dofn-elementwise}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+For some DoFns you may be able to provide both a batched and an element-wise
+implementation of your desired logic. You can do this, by simply defining both
+`process` and `process_batch`:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    # Multiply an individual int64 by 2
+    yield batch * 2
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    # Multiply a _batch_ of int64s by 2
+    yield batch * 2
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+When executing this DoFn, Beam will select the best implementation to use given
+the context. Generally, if the inputs to a DoFn are already batched Beam will
+use the batched implementation, otherwise it will use the element-wise
+implementation defined in the `process` method.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Note that, in this case, there is no need to define `infer_output_type`. This 
is
+because Beam can get the output type from the typehint on `process`.
+{{< /paragraph >}}
+
+
+
+### 14.3 Batch Production vs. Batch Consumption 
{#batched-dofn-batch-production}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+By convention, Beam assumes that the `process_batch` method, which consumes
+batched inputs, will also produce batched outputs. Similarly, Beam assumes the
+`process` method will produce individual elements. This can be overridden with
+the 
[`@beam.DoFn.yields_elements`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_elements)
 and
+[`@beam.DoFn.yields_batches`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn.yields_batches)
 decorators. For example:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+# Consumes elements, produces batches
+class ReadFromFile(beam.DoFn):
+
+  @beam.DoFn.yields_batches
+  def process(self, path: str) -> Iterator[np.ndarray]:
+    ...
+    yield array
+  
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self):
+    return np.int64
+
+# Consumes batches, produces elements
+class WriteToFile(beam.DoFn):
+  @beam.DoFn.yields_elements
+  def process(self, np.ndarray) -> Iterator[str]:
+    ...
+    yield output_path
+{{< /highlight >}}
+
+### 14.4 Supported Batch Types {#batched-dofn-types}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+So far we’ve used numpy types in these Batched DoFn implementations -

Review Comment:
   Consider adding these comments to the appropriate sections above: "-
   `np.int64 ` as the element typehint and `np.ndarray` as the corresponding
   batch typehint -". By the time we get here, the reader has to go back and 
look at the code. And conversely, calling out these type hints right after the 
code snippets would help users understand them.
   
   Also, you should be able to use `&ndash;` instead of `-` for readability 
(not to look like a minus sign).



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -7799,3 +7799,246 @@ Dataflow supports multi-language pipelines through the 
Dataflow Runner v2 backen
 ### 13.4 Tips and Troubleshooting {#x-lang-transform-tips-troubleshooting}
 
 For additional tips and troubleshooting information, see 
[here](https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips).
+
+## 14 Batched DoFns {#batched-dofns}
+{{< language-switcher java py go typescript >}}
+
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+Batched DoFns enable users to create modular, composable, components that
+operate on batches of multiple logical elements. These DoFns can leverage
+vectorized Python libraries, like numpy, scipy, and pandas, which operate on
+batches of data for efficiency.
+{{< /paragraph >}}
+
+### 14.1 Basics {#batched-dofn-basics}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+A trivial Batched DoFn might look like this:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+class MultiplyByTwo(beam.DoFn):
+  # Type
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 2
+
+  # Declare what the element-wise output type is
+  def infer_output_type(self, input_element_type):
+    return np.int64
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+This DoFn can be used in a Beam pipeline that otherwise operates on individual
+elements. Beam will implicitly buffer elements and create numpy arrays on the
+input side, and on the output side it will explode the numpy arrays back into
+individual elements:
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.Map(lambda x: x/3))  # Implicit batch explosion
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+However, if Batched DoFns with equivalent types are chained together, this
+batching and unbatching will be elided. The batches will be passed straight
+through! This makes it much simpler to compose transforms that operate on
+batches.
+{{< /paragraph >}}
+
+{{< highlight py >}}
+(p | beam.Create([1,2,3,4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTwo) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTwo) # Batches passed through
+   | beam.ParDo(MultiplyByTwo))
+{{< /highlight >}}
+
+{{< paragraph class="language-py" >}}
+Note that the typehints on the Batched DoFn are *critical*. This is how
+a Batched DoFn declares what batch type it expects. When this DoFn is ued in a
+pipeline, Beam will inspect these typehints to ensure that the input and output
+types are compatible, and to verify that it understands how to create instances
+of this type of batch (see [Supported Batch Types](#batched-dofn-types)).
+{{< /paragraph >}}
+
+
+### 14.2 Element-wise Fallback {#batched-dofn-elementwise}
+{{< paragraph class="language-go language-java language-typescript" >}}
+Batched DoFns are currently a Python-only feature.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+For some DoFns you may be able to provide both a batched and an element-wise
+implementation of your desired logic. You can do this, by simply defining both

Review Comment:
   `You can do this, by` -> `You can do this by`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to