damccorm commented on code in PR #30284: URL: https://github.com/apache/beam/pull/30284#discussion_r1490094353
########## learning/prompts/documentation-lookup/31_advanced_batched_dofn.md: ########## @@ -0,0 +1,41 @@ +Prompt: +What is a batched 'DoFn' in Apache Beam? + +Response: +In Apache Beam, a [batched ‘DoFn’](https://beam.apache.org/documentation/programming-guide/#batched-dofns) is a specialized type of [‘DoFn’](https://beam.apache.org/documentation/programming-guide/#pardo) that operates on batches of logical elements within a [‘PCollection’](https://beam.apache.org/documentation/basics/#pcollection). Unlike regular ‘DoFn’s, which process each element individually, batched ‘DoFn’s enable the efficient processing of multiple elements together as batches. + +Batched `DoFns` are currently only available in the Apache Beam Python SDK. They leverage powerful Python vectorized libraries, such as [NumPy](https://numpy.org/), [SciPy](https://scipy.org/), and [pandas](https://pandas.pydata.org/), which are designed for analysis and computing at scale. +Batched 'DoFn's are particularly useful when dealing with large amounts of data, allowing for parallel and optimized processing. Additionally, you can chain multiple batched ‘DoFn’s together to create a more efficient data processing pipeline. + +To implement a batched ‘DoFn’, you define a [‘process_batch’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process_batch) method instead of the typical [‘process’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process) method used in regular ‘DoFn’s. The ‘process_batch’ method takes a batch of elements as input and produces a batch of elements as an output. It is important to note that batches must have a single set of timing properties (event time, window, etc.) for every logical element within the batch; batches cannot span multiple timestamps. +For some ‘DoFn’s, you can provide both a batched and an element-wise implementation of your desired logic by defining both ‘process’ and ‘process_batch’ methods. + +Here is a simple example of a flexible ‘DoFn’ with both batched and element-wise implementations: + +In example below, when `MultiplyByTen` is applied to a `PCollection`, Beam recognizes that `np.ndarray` is an acceptable batch type to use in conjunction with `np.int64` elements. + +```python +class MultiplyByTen(beam.DoFn): + def process(self, element: np.int64) -> Iterator[np.int64]: + yield element * 10 + + def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]: + yield batch * 10 +``` +In this example, the ‘np.int64’ type represents the individual element. The 'process' method multiplies an element by ten, yielding a single element. The ‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies each element in the batch by ten, yielding a single batch. During pipeline execution, Apache Beam will automatically select the best implementation based on the context. Review Comment: ```suggestion In this example, the ‘np.int64’ type represents the individual element. The 'process' method multiplies an element by ten, yielding a single element. The ‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies each element in the batch by ten, yielding a single batch. During pipeline execution, Apache Beam will automatically select the best implementation based on the context. ``` nit ########## learning/prompts/documentation-lookup/30_advanced_additional_outputs.md: ########## @@ -0,0 +1,62 @@ +Prompt: +What are additional outputs in Apache Beam? + +Response: +In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra [`PCollection`s](https://beam.apache.org/documentation/basics/#pcollection) produced by a single [`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo) transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection`s and return them bundled together with the main output `PCollection`. + +Additional outputs serve as a mechanism to implement [pipeline branching](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections). You can use them when there is a need to split the output of a single transform into several `PCollection`s or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once. + +Producing additional outputs requires [tagging](https://beam.apache.org/documentation/programming-guide/#output-tags) each output `PCollection` with a unique identifier, which is then used to [emit](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn) elements to the corresponding output. + +In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag`s for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. You can find a sample Apache Beam Java pipeline that applies one transform to output two `PCollection`s in the [Branching `PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs) section in the Apache Beam documentation. + +The following Java code implements two additional output `PCollection`s for string and integer values in addition to the main output `PCollection` of strings: + +```java +// Input PCollection that contains strings. + PCollection<String> input = ...; +// Output tag for the main output PCollection of strings. +final TupleTag<String> mainOutputTag = new TupleTag<String>(){}; +// Output tag for the additional output PCollection of strings. +final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){}; +// Output tag for the additional output PCollection of integers. +final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){}; + +PCollectionTuple results = input.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) + // Specify the tag for the main output. + .withOutputTags(mainOutputTag, + // Specify the tags for the two additional outputs as a TupleTagList. + TupleTagList.of(additionalOutputTagString) + .and(additionalOutputTagIntegers))); + +``` + +The `processElement` method can emit elements to the main output or any additional output by invoking the output method on the `MultiOutputReceiver` object. The output method takes the tag of the output and the element to be emitted as arguments. + +```java +public void processElement(@Element String word, MultiOutputReceiver out) { + if (condition for main output) { + // Emit element to main output + out.get(mainOutputTag).output(word); + } else { + // Emit element to additional string output + out.get(additionalOutputTagString).output(word); + } + if (condition for additional integer output) { + // Emit element to additional integer output + out.get(additionalOutputTagIntegers).output(word.length()); + } + } +``` + +In the Apache Beam Python SDK, you can implement additional outputs by invoking the `with_outputs()` method on the `ParDo` and specifying the expected tags for the multiple outputs. The method returns a `DoOutputsTuple` object, with the specified tags serving as attributes that provide `ParDo` with access to the corresponding output `PCollection`s. For a sample Apache Beam Python pipeline demonstrating a word count example with multiple outputs, refer to the [multiple output `ParDo`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py) example in the Apache Beam GitHub repository. Review Comment: Can we inline this example similar to the Java example? I don't think the training can follow the link, so it wouldn't be able to formulate an example ########## learning/prompts/documentation-lookup/31_advanced_batched_dofn.md: ########## @@ -0,0 +1,41 @@ +Prompt: +What is a batched 'DoFn' in Apache Beam? + +Response: +In Apache Beam, a [batched ‘DoFn’](https://beam.apache.org/documentation/programming-guide/#batched-dofns) is a specialized type of [‘DoFn’](https://beam.apache.org/documentation/programming-guide/#pardo) that operates on batches of logical elements within a [‘PCollection’](https://beam.apache.org/documentation/basics/#pcollection). Unlike regular ‘DoFn’s, which process each element individually, batched ‘DoFn’s enable the efficient processing of multiple elements together as batches. + +Batched `DoFns` are currently only available in the Apache Beam Python SDK. They leverage powerful Python vectorized libraries, such as [NumPy](https://numpy.org/), [SciPy](https://scipy.org/), and [pandas](https://pandas.pydata.org/), which are designed for analysis and computing at scale. +Batched 'DoFn's are particularly useful when dealing with large amounts of data, allowing for parallel and optimized processing. Additionally, you can chain multiple batched ‘DoFn’s together to create a more efficient data processing pipeline. + +To implement a batched ‘DoFn’, you define a [‘process_batch’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process_batch) method instead of the typical [‘process’](https://beam.apache.org/releases/pydoc/current/_modules/apache_beam/transforms/core.html#DoFn.process) method used in regular ‘DoFn’s. The ‘process_batch’ method takes a batch of elements as input and produces a batch of elements as an output. It is important to note that batches must have a single set of timing properties (event time, window, etc.) for every logical element within the batch; batches cannot span multiple timestamps. +For some ‘DoFn’s, you can provide both a batched and an element-wise implementation of your desired logic by defining both ‘process’ and ‘process_batch’ methods. + +Here is a simple example of a flexible ‘DoFn’ with both batched and element-wise implementations: + +In example below, when `MultiplyByTen` is applied to a `PCollection`, Beam recognizes that `np.ndarray` is an acceptable batch type to use in conjunction with `np.int64` elements. + +```python +class MultiplyByTen(beam.DoFn): + def process(self, element: np.int64) -> Iterator[np.int64]: + yield element * 10 + + def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]: + yield batch * 10 +``` +In this example, the ‘np.int64’ type represents the individual element. The 'process' method multiplies an element by ten, yielding a single element. The ‘np.ndarray’ type represents the batch. The 'process_batch' method multiplies each element in the batch by ten, yielding a single batch. During pipeline execution, Apache Beam will automatically select the best implementation based on the context. + +By default, Apache Beam implicitly buffers elements and creates batches on the input side, then explodes batches back into individual elements on the output side. However, if batched 'DoFn's with equivalent types are chained together, this batch creation and explosion process is skipped, and the batches are passed through for more efficient processing. + +Here’s an example with chained ‘DoFn’s of equivalent types: + +```python +(p | beam.Create([1, 2, 3, 4]).with_output_types(np.int64) + | beam.ParDo(MultiplyByTen()) # Implicit buffering and batch creation + | beam.ParDo(MultiplyByTen()) # Batches passed through + | beam.ParDo(MultiplyByTen())) +``` +In this example, the ‘PTransform.with_output_types’ method sets the element-wise typehint for the output. Thus, when the `MultiplyByTen` class is applied to a `PCollection`, Apache Beam recognizes that `np.ndarray` is an acceptable batch type to use in conjunction with `np.int64` elements. Review Comment: ```suggestion In this example, the ‘PTransform.with_output_types’ method sets the element-wise typehint for the output. Thus, when the `MultiplyByTen` class is applied to a `PCollection`, Apache Beam recognizes that `np.ndarray` is an acceptable batch type to use in conjunction with `np.int64` elements. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
