This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eaaf328fce4 Duet AI advanced DoFn documentation lookup prompts (#30284)
eaaf328fce4 is described below

commit eaaf328fce4d894011a3977c969dbbde82134a71
Author: Oleh Borysevych <[email protected]>
AuthorDate: Thu Feb 15 15:09:56 2024 +0100

    Duet AI advanced DoFn documentation lookup prompts (#30284)
    
    * adding advanced dofn prompts
    
    * whitespace warning fixed
    
    * Update 30_advanced_additional_outputs.md
    
    * Update 29_advanced_side_inputs.md
    
    * Update 30_advanced_additional_outputs.md
    
    * Update 30_advanced_additional_outputs.md
    
    * Update 29_advanced_side_inputs.md
    
    * Update 30_advanced_additional_outputs.md
    
    fix whitespace
    
    * code review comments
    
    * added style guide changes
    
    * external code review comments
    
    * whitespace fix
    
    ---------
    
    Co-authored-by: Daria Bezkorovaina 
<[email protected]>
    Co-authored-by: Andrey Devyatkin <[email protected]>
---
 .../29_advanced_side_inputs.md                     |  36 +++++++
 .../30_advanced_additional_outputs.md              | 108 +++++++++++++++++++++
 .../31_advanced_batched_dofn.md                    |  43 ++++++++
 3 files changed, 187 insertions(+)

diff --git a/learning/prompts/documentation-lookup/29_advanced_side_inputs.md 
b/learning/prompts/documentation-lookup/29_advanced_side_inputs.md
new file mode 100644
index 00000000000..128b99ea255
--- /dev/null
+++ b/learning/prompts/documentation-lookup/29_advanced_side_inputs.md
@@ -0,0 +1,36 @@
+Prompt:
+What are side inputs in Apache Beam?
+
+Response:
+In Apache Beam, side inputs are additional inputs that can be provided to a 
[`ParDo` 
transform](https://beam.apache.org/documentation/programming-guide/#pardo), in 
addition to the main input  
[`PCollection`](https://beam.apache.org/documentation/basics/#pcollection). 
These side inputs can be used by the `ParDo` transform's `DoFn` to access and 
read additional data during the processing of each element in the main input 
`PCollection`.
+
+Side inputs are particularly useful when a Beam pipeline needs to ingest 
additional data dynamically at runtime when processing each element, rather 
than relying on preset or hard-coded values. By leveraging side inputs, Apache 
Beam can determine additional data based on the main `PCollection` data or even 
from a different branch of your pipeline. For example, you can leverage side 
inputs for your streaming analytics use cases, enriching streaming events with 
additional data retrieved fr [...]
+
+In the Apache Beam Python SDK, you can provide side inputs as extra arguments 
in the `DoFn`'s `process` method or the `Map`/`FlatMap` transforms. The Python 
SDK supports optional, positional, and keyword arguments.
+
+```python
+class MyDoFn(beam.DoFn):
+  def process(self, element, side_input):
+    ...
+```
+
+In the Apache Beam Java SDK, side inputs are provided to the `ParDo` transform 
by using the `.withSideInputs` method. To access a side input from within a 
`DoFn`, the `DoFn.ProcessContext.sideInput` method is invoked.
+
+```java
+PCollection<Integer> input = ...;
+PCollectionView<Integer> sideInput = ...;
+PCollection<Integer> output =
+    input.apply(ParDo.of(new DoFn<Integer, Integer>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        Integer sideInputValue = c.sideInput(sideInput);
+        ...
+      }
+    }).withSideInputs(sideInput));
+```
+
+Side inputs can also be used to process windowed data. Apache Beam uses the 
windows for the main input elements to look up the appropriate windows for the 
side input elements. Apache Beam projects the main input’s window to the side 
input's window set and uses the side input from the resulting window. Main and 
side inputs might have identical or different windowing strategies.
+
+For example, if the main input `PCollection` is windowed into ten-minute 
windows, and the side input is windowed into one-hour windows, Apache Beam 
projects the main input window set and selects the value from the corresponding 
one-hour side input window. For more information, refer to [side inputs and 
windowing](https://beam.apache.org/documentation/programming-guide/#side-inputs-windowing).
+
+For common patterns and code samples to implement side inputs in Apache Beam 
pipelines, refer to [side input 
patterns](https://beam.apache.org/documentation/patterns/side-inputs/).
diff --git 
a/learning/prompts/documentation-lookup/30_advanced_additional_outputs.md 
b/learning/prompts/documentation-lookup/30_advanced_additional_outputs.md
new file mode 100644
index 00000000000..9ca5a9e6410
--- /dev/null
+++ b/learning/prompts/documentation-lookup/30_advanced_additional_outputs.md
@@ -0,0 +1,108 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra 
[`PCollection`s](https://beam.apache.org/documentation/basics/#pcollection) 
produced by a single 
[`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo) 
transform in addition to the main output `PCollection`. The `ParDo` transform 
has the flexibility to produce any number of additional output `PCollection`s 
and return them bundled together with the main output `PColl [...]
+
+Additional outputs serve as a mechanism to implement [pipeline 
branching](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections).
 You can use them when there is a need to split the output of a single 
transform into several `PCollection`s or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the i [...]
+
+Producing additional outputs requires 
[tagging](https://beam.apache.org/documentation/programming-guide/#output-tags) 
each output `PCollection` with a unique identifier, which is then used to 
[emit](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn)
 elements to the corresponding output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag`s for each of the outputs, the tags 
are passed to the `ParDo` using the `.withOutputTags` method. You can find a 
sample Apache Beam Java pipeline that applies one transform to output two 
`PCollection`s in the [Branching 
`PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipe
 [...]
+
+The following Java code implements two additional output `PCollection`s for 
string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));
+
+```
+
+The `processElement` method can emit elements to the main output or any 
additional output by invoking the output method on the `MultiOutputReceiver` 
object. The output method takes the tag of the output and the element to be 
emitted as arguments.
+
+```java
+public void processElement(@Element String word, MultiOutputReceiver out) {
+       if (condition for main output) {
+         // Emit element to main output
+         out.get(mainOutputTag).output(word);
+       } else {
+         // Emit element to additional string output
+         out.get(additionalOutputTagString).output(word);
+       }
+       if (condition for additional integer output) {
+         // Emit element to additional integer output
+         out.get(additionalOutputTagIntegers).output(word.length());
+       }
+     }
+```
+
+In the Apache Beam Python SDK, you can implement additional outputs by 
invoking the `with_outputs()` method on the `ParDo` and specifying the expected 
tags for the multiple outputs.
+
+The following Python code demonstrates how to implement additional outputs for 
a `ParDo` transform that outputs two `PCollection`s of strings and integers in 
addition to the main output `PCollection` of strings:
+
+```python
+class SplitLinesToWordsFn(beam.DoFn):
+
+  # These tags will be used to tag the outputs of this DoFn.
+  OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+
+  def process(self, element):
+    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged 
collection.
+    yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
+
+    words = re.findall(r'[A-Za-z\']+', element)
+    for word in words:
+      if len(word) <= 3:
+        # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged 
collection.
+        yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
+      else:
+        # yield word to add it to the main collection.
+        yield word
+```
+
+The method returns a `DoOutputsTuple` object, with the specified tags serving 
as attributes that provide `ParDo` with access to the corresponding output 
`PCollection`s.
+
+```python
+with beam.Pipeline(options=pipeline_options) as p:
+
+    lines = p | ReadFromText(known_args.input)
+
+    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+    split_lines_result = (
+        lines
+        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+            main='words'))
+
+    # split_lines_result is an object of type DoOutputsTuple
+    words, _, _ = split_lines_result
+    short_words = 
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+    character_count = split_lines_result.tag_character_count
+```
+
+Refer to the [multiple output 
`ParDo`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py)
 for the complete example code.
+
+For more information about additional outputs and sample pipelines 
demonstrating their use, you can refer to the Apache Beam documentation on:
+* [Tagging multiple 
outputs](https://beam.apache.org/documentation/programming-guide/#output-tags)
+* [Emitting elements to multiple 
outputs](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn)
+* [Accessing additional parameters in your 
`DoFn`](https://beam.apache.org/documentation/programming-guide/#other-dofn-parameters)
+* [Branching 
`PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections)
diff --git a/learning/prompts/documentation-lookup/31_advanced_batched_dofn.md 
b/learning/prompts/documentation-lookup/31_advanced_batched_dofn.md
new file mode 100644
index 00000000000..0828096815f
--- /dev/null
+++ b/learning/prompts/documentation-lookup/31_advanced_batched_dofn.md
@@ -0,0 +1,43 @@
+Prompt:
+What is a batched 'DoFn' in Apache Beam?
+
+Response:
+In Apache Beam, a [batched 
‘DoFn’](https://beam.apache.org/documentation/programming-guide/#batched-dofns) 
is a specialized type of 
[‘DoFn’](https://beam.apache.org/documentation/programming-guide/#pardo) that 
operates on batches of logical elements within a 
[‘PCollection’](https://beam.apache.org/documentation/basics/#pcollection). 
Unlike regular ‘DoFn’s, which process each element individually, batched 
‘DoFn’s enable the efficient processing of multiple elements together as 
batches.
+
+Batched `DoFns` are currently only available in the Apache Beam Python SDK. 
They leverage powerful Python vectorized libraries, such as 
[NumPy](https://numpy.org/), [SciPy](https://scipy.org/), and 
[pandas](https://pandas.pydata.org/), which are designed for analysis and 
computing at scale.
+Batched 'DoFn's are particularly useful when dealing with large amounts of 
data, allowing for parallel and optimized processing. Additionally, you can 
chain multiple batched ‘DoFn’s together to create a more efficient data 
processing pipeline.
+
+To implement a batched ‘DoFn’, you define a 
[‘process_batch’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process_batch)
 method instead of the typical 
[‘process’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process)
 method used in regular ‘DoFn’s. The ‘process_batch’ method takes a batch of 
elements as input and produces a batch of elements as an output. It is 
important to note that batche [...]
+For some ‘DoFn’s, you can provide both a batched and an element-wise 
implementation of your desired logic by defining both ‘process’ and 
‘process_batch’ methods.
+
+Here is a simple example of a flexible ‘DoFn’ with both batched and 
element-wise implementations:
+
+In example below, when `MultiplyByTen` is applied to a `PCollection`, Beam 
recognizes that `np.ndarray` is an acceptable batch type to use in conjunction 
with `np.int64` elements.
+
+```python
+class MultiplyByTen(beam.DoFn):
+  def process(self, element: np.int64) -> Iterator[np.int64]:
+    yield element * 10
+
+  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
+    yield batch * 10
+```
+
+In this example, the ‘np.int64’ type represents the individual element. The 
'process' method multiplies an element by ten, yielding a single element. The 
‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies 
each element in the batch by ten, yielding a single batch. During pipeline 
execution, Apache Beam will automatically select the best implementation based 
on the context.
+
+By default, Apache Beam implicitly buffers elements and creates batches on the 
input side, then explodes batches back into individual elements on the output 
side. However, if batched 'DoFn's with equivalent types are chained together, 
this batch creation and explosion process is skipped, and the batches are 
passed through for more efficient processing.
+
+Here’s an example with chained ‘DoFn’s of equivalent types:
+
+```python
+(p | beam.Create([1, 2, 3, 4]).with_output_types(np.int64)
+   | beam.ParDo(MultiplyByTen()) # Implicit buffering and batch creation
+   | beam.ParDo(MultiplyByTen()) # Batches passed through
+   | beam.ParDo(MultiplyByTen()))
+```
+
+In this example, the ‘PTransform.with_output_types’ method sets the 
element-wise typehint for the output. Thus, when the `MultiplyByTen` class is 
applied to a `PCollection`, Apache Beam recognizes that `np.ndarray` is an 
acceptable batch type to use in conjunction with `np.int64` elements.
+
+The ‘process_batch’ method is expected to produce batched outputs, while the 
‘process’ method should produce individual elements. You can customize this 
production logic with the ‘@beam.DoFn.yields_elements’ and 
‘@beam.DoFn.yields_batches` decorators.
+
+For more details, including advanced usage and code samples, you can refer to 
the [Apache Beam documentation on batched 
‘DoFn’s](https://beam.apache.org/documentation/programming-guide/#batched-dofn-types).
\ No newline at end of file

Reply via email to