damccorm commented on code in PR #30284:
URL: https://github.com/apache/beam/pull/30284#discussion_r1490094353


##########
learning/prompts/documentation-lookup/31_advanced_batched_dofn.md:
##########
@@ -0,0 +1,41 @@
+Prompt:
+What is a batched 'DoFn' in Apache Beam?
+
+Response:
+In Apache Beam, a [batched 
‘DoFn’](https://beam.apache.org/documentation/programming-guide/#batched-dofns) 
is a specialized type of 
[‘DoFn’](https://beam.apache.org/documentation/programming-guide/#pardo) that 
operates on batches of logical elements within a 
[‘PCollection’](https://beam.apache.org/documentation/basics/#pcollection). 
Unlike regular ‘DoFn’s, which process each element individually, batched 
‘DoFn’s enable the efficient processing of multiple elements together as 
batches.
+
+Batched `DoFns` are currently only available in the Apache Beam Python SDK. 
They leverage powerful Python vectorized libraries, such as 
[NumPy](https://numpy.org/), [SciPy](https://scipy.org/), and 
[pandas](https://pandas.pydata.org/), which are designed for analysis and 
computing at scale. 
+Batched 'DoFn's are particularly useful when dealing with large amounts of 
data, allowing for parallel and optimized processing. Additionally, you can 
chain multiple batched ‘DoFn’s together to create a more efficient data 
processing pipeline.
+
+To implement a batched ‘DoFn’, you define a 
[‘process_batch’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process_batch)
 method instead of the typical 
[‘process’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process)
 method used in regular ‘DoFn’s. The ‘process_batch’ method takes a batch of 
elements as input and produces a batch of elements as an output. It is 
important to note that batches must have a single set of timing properties 
(event time, window, etc.) for every logical element within the batch; batches 
cannot span multiple timestamps. 
+For some ‘DoFn’s, you can provide both a batched and an element-wise 
implementation of your desired logic by defining both ‘process’ and 
‘process_batch’ methods. 
+
+Here is a simple example of a flexible ‘DoFn’ with both batched and 
element-wise implementations:
+
+In example below, when `MultiplyByTen` is applied to a `PCollection`, Beam 
recognizes that `np.ndarray` is an acceptable batch type to use in conjunction 
with `np.int64` elements.
+
+```python
+class MultiplyByTen(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    yield element * 10
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 10
+```
+In this example, the ‘np.int64’ type represents the individual element. The 
'process' method multiplies an element by ten, yielding a single element. The 
‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies 
each element in the batch by ten, yielding a single batch. During pipeline 
execution, Apache Beam will automatically select the best implementation based 
on the context. 

Review Comment:
   ```suggestion
   
   In this example, the ‘np.int64’ type represents the individual element. The 
'process' method multiplies an element by ten, yielding a single element. The 
‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies 
each element in the batch by ten, yielding a single batch. During pipeline 
execution, Apache Beam will automatically select the best implementation based 
on the context. 
   ```
   
   nit



##########
learning/prompts/documentation-lookup/30_advanced_additional_outputs.md:
##########
@@ -0,0 +1,62 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra 
[`PCollection`s](https://beam.apache.org/documentation/basics/#pcollection) 
produced by a single 
[`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo) 
transform in addition to the main output `PCollection`. The `ParDo` transform 
has the flexibility to produce any number of additional output `PCollection`s 
and return them bundled together with the main output `PCollection`.
+
+Additional outputs serve as a mechanism to implement [pipeline 
branching](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections).
 You can use them when there is a need to split the output of a single 
transform into several `PCollection`s or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the input `PCollection` just once.
+
+Producing additional outputs requires 
[tagging](https://beam.apache.org/documentation/programming-guide/#output-tags) 
each output `PCollection` with a unique identifier, which is then used to 
[emit](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn)
 elements to the corresponding output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag`s for each of the outputs, the tags 
are passed to the `ParDo` using the `.withOutputTags` method. You can find a 
sample Apache Beam Java pipeline that applies one transform to output two 
`PCollection`s in the [Branching 
`PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs)
 section in the Apache Beam documentation.
+
+The following Java code implements two additional output `PCollection`s for 
string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));
+
+```
+
+The `processElement` method can emit elements to the main output or any 
additional output by invoking the output method on the `MultiOutputReceiver` 
object. The output method takes the tag of the output and the element to be 
emitted as arguments.
+
+```java
+public void processElement(@Element String word, MultiOutputReceiver out) {
+       if (condition for main output) {
+         // Emit element to main output
+         out.get(mainOutputTag).output(word);
+       } else {
+         // Emit element to additional string output
+         out.get(additionalOutputTagString).output(word);
+       }
+       if (condition for additional integer output) {
+         // Emit element to additional integer output
+         out.get(additionalOutputTagIntegers).output(word.length());
+       }
+     }
+```
+
+In the Apache Beam Python SDK, you can implement additional outputs by 
invoking the `with_outputs()` method on the `ParDo` and specifying the expected 
tags for the multiple outputs. The method returns a `DoOutputsTuple` object, 
with the specified tags serving as attributes that provide `ParDo` with access 
to the corresponding output `PCollection`s. For a sample Apache Beam Python 
pipeline demonstrating a word count example with multiple outputs, refer to the 
[multiple output 
`ParDo`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py)
 example in the Apache Beam GitHub repository.

Review Comment:
   Can we inline this example similar to the Java example? I don't think the 
training can follow the link, so it wouldn't be able to formulate an example



##########
learning/prompts/documentation-lookup/31_advanced_batched_dofn.md:
##########
@@ -0,0 +1,41 @@
+Prompt:
+What is a batched 'DoFn' in Apache Beam?
+
+Response:
+In Apache Beam, a [batched 
‘DoFn’](https://beam.apache.org/documentation/programming-guide/#batched-dofns) 
is a specialized type of 
[‘DoFn’](https://beam.apache.org/documentation/programming-guide/#pardo) that 
operates on batches of logical elements within a 
[‘PCollection’](https://beam.apache.org/documentation/basics/#pcollection). 
Unlike regular ‘DoFn’s, which process each element individually, batched 
‘DoFn’s enable the efficient processing of multiple elements together as 
batches.
+
+Batched `DoFns` are currently only available in the Apache Beam Python SDK. 
They leverage powerful Python vectorized libraries, such as 
[NumPy](https://numpy.org/), [SciPy](https://scipy.org/), and 
[pandas](https://pandas.pydata.org/), which are designed for analysis and 
computing at scale. 
+Batched 'DoFn's are particularly useful when dealing with large amounts of 
data, allowing for parallel and optimized processing. Additionally, you can 
chain multiple batched ‘DoFn’s together to create a more efficient data 
processing pipeline.
+
+To implement a batched ‘DoFn’, you define a 
[‘process_batch’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process_batch)
 method instead of the typical 
[‘process’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process)
 method used in regular ‘DoFn’s. The ‘process_batch’ method takes a batch of 
elements as input and produces a batch of elements as an output. It is 
important to note that batches must have a single set of timing properties 
(event time, window, etc.) for every logical element within the batch; batches 
cannot span multiple timestamps. 
+For some ‘DoFn’s, you can provide both a batched and an element-wise 
implementation of your desired logic by defining both ‘process’ and 
‘process_batch’ methods. 
+
+Here is a simple example of a flexible ‘DoFn’ with both batched and 
element-wise implementations:
+
+In example below, when `MultiplyByTen` is applied to a `PCollection`, Beam 
recognizes that `np.ndarray` is an acceptable batch type to use in conjunction 
with `np.int64` elements.
+
+```python
+class MultiplyByTen(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    yield element * 10
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 10
+```
+In this example, the ‘np.int64’ type represents the individual element. The 
'process' method multiplies an element by ten, yielding a single element. The 
‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies 
each element in the batch by ten, yielding a single batch. During pipeline 
execution, Apache Beam will automatically select the best implementation based 
on the context. 
+
+By default, Apache Beam implicitly buffers elements and creates batches on the 
input side, then explodes batches back into individual elements on the output 
side. However, if batched 'DoFn's with equivalent types are chained together, 
this batch creation and explosion process is skipped, and the batches are 
passed through for more efficient processing.
+
+Here’s an example with chained ‘DoFn’s of equivalent types:
+
+```python
+(p | beam.Create([1, 2, 3, 4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTen()) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTen()) # Batches passed through
+   | beam.ParDo(MultiplyByTen()))
+```
+In this example, the ‘PTransform.with_output_types’ method sets the 
element-wise typehint for the output. Thus, when the `MultiplyByTen` class is 
applied to a `PCollection`, Apache Beam recognizes that `np.ndarray` is an 
acceptable batch type to use in conjunction with `np.int64` elements.

Review Comment:
   ```suggestion
   
   In this example, the ‘PTransform.with_output_types’ method sets the 
element-wise typehint for the output. Thus, when the `MultiplyByTen` class is 
applied to a `PCollection`, Apache Beam recognizes that `np.ndarray` is an 
acceptable batch type to use in conjunction with `np.int64` elements.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to