andreydevyatkin commented on code in PR #30873:
URL: https://github.com/apache/beam/pull/30873#discussion_r1555991358


##########
learning/prompts/documentation-lookup-nolinks/09_basic_triggers.md:
##########
@@ -0,0 +1,30 @@
+Prompt:
+What is a trigger in Apache Beam?
+
+Response:
+Apache Beam uses triggers to determine when to emit the aggregated results of 
each window, referred to as a pane.
+
+Triggers provide two additional capabilities compared to outputting at the end 
of a window:
+- Allowing early results to be output before the end of the window.
+- Allowing late data to be handled after the end of the window.
+
+This feature lets you control the flow of your data and balance between 
completeness, latency, and cost.
+
+You set the triggers for a `PCollection` by setting the `trigger` parameter of 
the `WindowInto` transform.
+
+```python
+  pcollection | WindowInto(
+    FixedWindows(1 * 60),
+    trigger=AfterProcessingTime(1 * 60),
+    accumulation_mode=AccumulationMode.DISCARDING)

Review Comment:
   ```suggestion
   pcollection | WindowInto(
       FixedWindows(1 * 60),
       trigger=AfterProcessingTime(1 * 60),
       accumulation_mode=AccumulationMode.DISCARDING,
   )
   ```



##########
learning/prompts/documentation-lookup-nolinks/25_io_csv.md:
##########
@@ -0,0 +1,29 @@
+Prompt:
+Is CSV format supported in Apache Beam?
+
+Response:
+CSV is a data format for storing and exchanging data. Apache Beam supports 
reading data from and writing data to CSV files using the `ReadFromText` and 
`WriteToText` transforms in the `TextIO` module. For more information, see the 
TextIO connector documentation for your programming language of choice.
+
+To get started with CSV and Apache Beam, refer to the Dataflow Cookbook GitHub 
repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a CSV 
file:
+
+```python
+class CsvOptions(PipelineOptions):
+    @classmethod
+    def _add_argparse_args(cls, parser):
+        parser.add_argument(
+            '--file_path',
+            default="gs://your-bucket/your-file.csv",
+            help='Csv file path'
+        )
+
+options = CsvOptions()
+
+with beam.Pipeline(options=options) as p:
+    output = (p | "Read from Csv file" >> ReadFromCsv(
+        path=options.file_path
+        )
+        | "Log Data" >> Map(logging.info))
+

Review Comment:
   ```suggestion
   class CsvOptions(PipelineOptions):
       @classmethod
       def _add_argparse_args(cls, parser):
           parser.add_argument(
               "--file_path",
               default="gs://your-bucket/your-file.csv",
               help="Csv file path",
           )
   
   
   options = CsvOptions()
   
   with beam.Pipeline(options=options) as p:
       output = (
           p
           | "Read from Csv file" >> ReadFromCsv(path=options.file_path)
           | "Log Data" >> Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/20_io_biguery.md:
##########
@@ -0,0 +1,41 @@
+Prompt:
+Is BigQuery supported in Apache Beam?
+
+Response:
+Yes, Apache Beam supports BigQuery. BigQuery is a serverless and 
cost-effective enterprise data warehouse offered by Google Cloud. Apache Beam 
provides a BigQueryIO connector to read and write data from and to BigQuery. 
The BigQueryIO connector supports both batch and streaming pipelines.
+
+The following Apache Beam SDKs support the BigQueryIO connector:
+* Java (natively)
+* Python (natively)
+* Go (natively and through X Language)
+* Typescript (through X Language)
+
+To read data from BigQuery, use the `ReadFromBigQuery` function. Apache Beam 
can read data directly from a BigQuery table or using an SQL query. The default 
mode is to return table rows read from a BigQuery source as dictionaries. 
Built-in `TableRow` objects can also be returned.
+
+Here is an example of Apache Beam pipeline code for reading from BigQuery:
+
+```python
+from apache_beam.io.gcp.bigquery import ReadFromBigQuery
+
+with beam.Pipeline(options=options) as p:
+  # read from a table
+    lines_table = p | 'Read' >> ReadFromBigQuery(table=table)
+  # read from a query
+    lines_query = p | 'Read' >> ReadFromBigQuery(query="SELECT * FROM table")
+```
+
+Here is an example of Apache Beam pipeline code for writing to BigQuery:
+
+```python
+from apache_beam.io.gcp.bigquery import WriteToBigQuery
+
+with beam.Pipeline(options=options) as p:
+  # write to a table
+    p | 'Write' >> beam.io.WriteToBigQuery(
+        table,
+        schema=TABLE_SCHEMA,
+        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

Review Comment:
   ```suggestion
   from apache_beam.io.gcp.bigquery import WriteToBigQuery
   
   with beam.Pipeline(options=options) as p:
       # write to a table
       p | "Write" >> beam.io.WriteToBigQuery(
           table,
           schema=TABLE_SCHEMA,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/18_io_kafka.md:
##########
@@ -0,0 +1,24 @@
+Prompt:
+Is Apache Kafka supported in Apache Beam?
+
+Response:
+Apache Kafka is a distributed streaming platform that is used for building 
real-time data pipelines and streaming applications. It is horizontally 
scalable, fault-tolerant, and fast.
+
+Apache Beam has a built-in KafkaIO connector for Apache Kafka that enables 
Apache Beam pipelines to read data from and write data to Kafka. Apache Kafka 
is supported in both batch and streaming pipelines and can serve as both a 
source and a sink. For more information, refer to the KafkaIO connector 
documentation for your preferred programming language.
+
+To get started with Apache Kafka and Apache Beam, refer to the Dataflow 
Cookbook repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a Kafka 
topic:
+
+```python
+with beam.Pipeline(options=options) as p:
+    output = (p | "Read from Kafka" >> ReadFromKafka(
+                        consumer_config={'bootstrap.servers': 
'localhost:9092'},
+                        topics=['my_topic'],
+                        with_metadata=False)
+                        | "Log Data" >> Map(logging.info))

Review Comment:
   ```suggestion
   with beam.Pipeline(options=options) as p:
       output = (
           p
           | "Read from Kafka"
           >> ReadFromKafka(
               consumer_config={"bootstrap.servers": "localhost:9092"},
               topics=["my_topic"],
               with_metadata=False,
           )
           | "Log Data" >> Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/28_io_jdbc.md:
##########
@@ -0,0 +1,48 @@
+Prompt:
+Is Oracle Java Database Connectivity (JDBC) supported in Apache Beam?
+
+Response:
+The Oracle Java Database Connectivity (JDBC) API is a Java API for connecting 
to relational databases. Apache Beam supports reading and writing data with 
JDBC using the `ReadFromJdbc` and `WriteToJdbc` transforms in the `JdbcIO` 
module. The JdbcIO connector is native to Java and available through 
cross-language transforms in Python and Go.
+
+To get started with JDBC and Apache Beam, refer to the Dataflow Cookbook 
GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data with JDBC:
+
+```python
+class ExampleRow(NamedTuple):
+    id: int
+    name: str
+
+class ReadJdbcOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument(
+          "--driverClassName",
+          help="JDBC driver class name")
+      parser.add_argument(
+          "--url",
+          help="JDBC URL")
+      parser.add_argument(
+          "--username",
+          help="JDBC username")
+      parser.add_argument(
+          "--password",
+          help="JDBC password")
+      parser.add_argument(
+          "--query",
+          default="SELECT * FROM users",
+          help="JDBC query")
+
+options = ReadJdbcOptions()
+
+with beam.Pipeline(options=options) as p:
+
+  (p | "Read from JDBC" >> ReadFromJdbc(
+      driverClassName=options.driverClassName,
+      url=options.url,
+      username=options.username,
+      password=options.password,
+      query=options.query)
+     | Map(logging.info))

Review Comment:
   ```suggestion
   class ExampleRow(NamedTuple):
       id: int
       name: str
   
   
   class ReadJdbcOptions(PipelineOptions):
       @classmethod
       def _add_argparse_args(cls, parser):
           parser.add_argument("--driverClassName", help="JDBC driver class 
name")
           parser.add_argument("--url", help="JDBC URL")
           parser.add_argument("--username", help="JDBC username")
           parser.add_argument("--password", help="JDBC password")
           parser.add_argument("--query", default="SELECT * FROM users", 
help="JDBC query")
   
   
   options = ReadJdbcOptions()
   
   with beam.Pipeline(options=options) as p:
       (
           p
           | "Read from JDBC"
           >> ReadFromJdbc(
               driverClassName=options.driverClassName,
               url=options.url,
               username=options.username,
               password=options.password,
               query=options.query,
           )
           | Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/27_io_parquet.md:
##########
@@ -0,0 +1,27 @@
+Prompt:
+Is Apache Parquet format supported in Apache Beam?
+
+Response:
+Apache Parquet is a data format for storing and exchanging data. Apache Beam 
supports reading from and writing to Parquet files using the `ReadFromParquet` 
and `WriteToParquet` transforms in the `ParquetIO` module. For more 
information, see the ParquetIO connector documentation for your programming 
language of choice.
+
+To get started with Avro and Apache Beam, refer to the Dataflow Cookbook 
GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a 
Parquet file:
+
+```python
+class ReadParquetOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument(
+          "--path",
+          help="GCS path to parquet file")
+
+  options = ReadParquetOptions()
+
+  with beam.Pipeline(options=options) as p:
+
+    (p | "ReadParquet" >> ReadFromParquet(options.path)
+       | Map(logging.info))
+

Review Comment:
   ```suggestion
   class ReadParquetOptions(PipelineOptions):
       @classmethod
       def _add_argparse_args(cls, parser):
           parser.add_argument("--path", help="GCS path to parquet file")
   
       options = ReadParquetOptions()
   
       with beam.Pipeline(options=options) as p:
           (p | "ReadParquet" >> ReadFromParquet(options.path) | 
Map(logging.info))
   ```



##########
learning/prompts/documentation-lookup-nolinks/22_io_spanner.md:
##########
@@ -0,0 +1,28 @@
+Prompt:
+Is Cloud Spanner supported in Apache Beam?
+
+Response:
+Cloud Spanner is a Google Cloud service that provides a fully managed, 
mission-critical, relational database service that offers transactional 
consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and 
automatic, synchronous replication for high availability.
+
+Apache Beam has a built-in SpannerIO connector for Cloud Spanner that enables 
Apache Beam pipelines to read data from and write data to Cloud Spanner. Cloud 
Spanner is supported in both batch pipelines and streaming pipelines and can 
serve both as a source and a sink. For more information, see the SpannerIO 
connector documentation for Java, Python, or Go.
+
+To get started with Cloud Spanner and Apache Beam, refer to the Dataflow 
Cookbook GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a Cloud 
Spanner table using an SQL query:
+
+```python
+class ExampleRow(NamedTuple):
+    id: int
+    name: str
+
+with beam.Pipeline(options=options) as p:
+    output = (p | "Read from table" >> ReadFromSpanner(
+        project_id=options.project_id,
+        instance_id=options.instance_id,
+        database_id=options.database_id,
+        row_type=ExampleRow,
+        sql="SELECT * FROM example_row"
+        )
+        | "Map Data" >> Map(lambda row: f"Id = {row.id}, Name = {row.name}")
+        | "Log Data" >> Map(logging.info))

Review Comment:
   ```suggestion
   class ExampleRow(NamedTuple):
       id: int
       name: str
   
   
   with beam.Pipeline(options=options) as p:
       output = (
           p
           | "Read from table"
           >> ReadFromSpanner(
               project_id=options.project_id,
               instance_id=options.instance_id,
               database_id=options.database_id,
               row_type=ExampleRow,
               sql="SELECT * FROM example_row",
           )
           | "Map Data" >> Map(lambda row: f"Id = {row.id}, Name = {row.name}")
           | "Log Data" >> Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/20_io_biguery.md:
##########
@@ -0,0 +1,41 @@
+Prompt:
+Is BigQuery supported in Apache Beam?
+
+Response:
+Yes, Apache Beam supports BigQuery. BigQuery is a serverless and 
cost-effective enterprise data warehouse offered by Google Cloud. Apache Beam 
provides a BigQueryIO connector to read and write data from and to BigQuery. 
The BigQueryIO connector supports both batch and streaming pipelines.
+
+The following Apache Beam SDKs support the BigQueryIO connector:
+* Java (natively)
+* Python (natively)
+* Go (natively and through X Language)
+* Typescript (through X Language)
+
+To read data from BigQuery, use the `ReadFromBigQuery` function. Apache Beam 
can read data directly from a BigQuery table or using an SQL query. The default 
mode is to return table rows read from a BigQuery source as dictionaries. 
Built-in `TableRow` objects can also be returned.
+
+Here is an example of Apache Beam pipeline code for reading from BigQuery:
+
+```python
+from apache_beam.io.gcp.bigquery import ReadFromBigQuery
+
+with beam.Pipeline(options=options) as p:
+  # read from a table
+    lines_table = p | 'Read' >> ReadFromBigQuery(table=table)
+  # read from a query
+    lines_query = p | 'Read' >> ReadFromBigQuery(query="SELECT * FROM table")

Review Comment:
   ```suggestion
   from apache_beam.io.gcp.bigquery import ReadFromBigQuery
   
   with beam.Pipeline(options=options) as p:
       # read from a table
       lines_table = p | "Read" >> ReadFromBigQuery(table=table)
       # read from a query
       lines_query = p | "Read" >> ReadFromBigQuery(query="SELECT * FROM table")
   ```



##########
learning/prompts/documentation-lookup-nolinks/08_basic_windowing.md:
##########
@@ -0,0 +1,31 @@
+Prompt:
+What is windowing in Apache Beam?
+
+Response:
+In stream processing, windowing allows you to divide data streams into logical 
units for efficient and correct parallel processing.
+
+With an unbounded data set, collecting all of the elements is impossible, 
because new elements are constantly being added. In the Apache Beam model, any 
`PCollection` (including unbounded `PCollection` objects) can be subdivided 
into logical windows. Grouping transforms then consider the elements of each 
`PCollection` on a per-window basis.
+
+Because Apache Beam's default windowing strategy is to assign each element to 
a single, global window, you must explicitly specify a windowing function for 
your pipeline.
+
+The following code snippet shows how to divide a `PCollection` into 60-second 
windows:
+
+```python
+from apache_beam import beam
+from apache_beam import window
+fixed_windowed_items = (
+    items | 'window' >> beam.WindowInto(window.FixedWindows(60)))

Review Comment:
   ```suggestion
   from apache_beam import beam
   from apache_beam import window
   
   fixed_windowed_items = items | 'window' >> 
beam.WindowInto(window.FixedWindows(60))
   ```



##########
learning/prompts/documentation-lookup-nolinks/07_basic_runners.md:
##########
@@ -0,0 +1,25 @@
+Prompt:
+What is a runner in Apache Beam?
+
+Response:
+Apache Beam runners are the execution engines that run the pipelines. They 
translate or adapt the pipeline into a form that can be executed on a massively 
parallel big data processing system, such as Apache Flink, Apache Spark, Google 
Cloud Dataflow, and more.
+
+Choosing a runner is an important step in the pipeline development process. 
The runner you choose determines where and how your pipeline runs. For more 
information about available runners and their capabilities, see the Beam 
Capability Matrix.
+
+To specify a runner, when you execute the pipeline, use the `--runner` flag. 
For example, to run the WordCount pipeline on Google Cloud Dataflow, you would 
run the following command:
+
+```python
+python -m apache_beam.examples.wordcount \
+  --region DATAFLOW_REGION \
+  --input gs://dataflow-samples/shakespeare/kinglear.txt \
+  --output gs://STORAGE_BUCKET/results/outputs \
+  --runner DataflowRunner \
+  --project PROJECT_ID \
+  --temp_location gs://STORAGE_BUCKET/tmp/
+  ```

Review Comment:
   nit: extra spaces need to be removed



##########
learning/prompts/documentation-lookup-nolinks/24_io_json.md:
##########
@@ -0,0 +1,28 @@
+Prompt:
+Is JSON supported in Apache Beam?
+
+Response:
+JSON is a data format for storing and exchanging data. Apache Beam supports 
reading data from and writing data to JSON files using the `ReadFromJson` and 
`WriteToJson` transforms within the `TextIO` module. For more information, see 
the TextIO connector documentation for your programming language of choice.
+
+To get started with JSON and Apache Beam, refer to the Dataflow Cookbook 
GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a JSON 
file:
+
+```python
+class JsonOptions(PipelineOptions):
+    @classmethod
+    def _add_argparse_args(cls, parser):
+        parser.add_argument(
+            '--file_path',
+            default="gs://your-bucket/your-file.json",
+            help='Json file path'
+        )
+
+options = JsonOptions()
+with beam.Pipeline(options=options) as p:
+    output = (p | "Read from Json file" >> ReadFromJson(
+        path=options.file_path,
+        lines=False
+        )
+        | "Log Data" >> Map(logging.info))

Review Comment:
   ```suggestion
   class JsonOptions(PipelineOptions):
       @classmethod
       def _add_argparse_args(cls, parser):
           parser.add_argument(
               "--file_path",
               default="gs://your-bucket/your-file.json",
               help="Json file path",
           )
   
   
   options = JsonOptions()
   with beam.Pipeline(options=options) as p:
       output = (
           p
           | "Read from Json file" >> ReadFromJson(path=options.file_path, 
lines=False)
           | "Log Data" >> Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md:
##########
@@ -0,0 +1,80 @@
+Prompt:
+How to execute Apache Beam pipelines with Cloud Dataflow?
+
+Response:
+Apache Beam supports multiple runners for executing pipelines on various 
platforms. One such runner is the Cloud Dataflow Runner, tailored for running 
pipelines on the Google Cloud Dataflow service. Cloud Dataflow offers fully 
managed and unified stream and batch data processing, boasting dynamic work 
rebalancing and built-in autoscaling capabilities.
+
+When you execute a pipeline on Cloud Dataflow, the Runner uploads your code 
and dependencies to a Cloud Storage bucket and creates a Dataflow job, which 
then executes your pipeline on managed resources within the Google Cloud 
Platform.
+
+To execute Apache Beam pipelines using the Cloud Dataflow Runner, follow these 
steps:
+
+***1. Setup Your Cloud Project and Resources:***
+
+Complete the steps outlined in the 'Before You Begin' section of the Cloud 
Dataflow quickstart for your chosen programming language:
+1. Select or create a Google Cloud Platform Console project.
+2. Enable billing for your project.
+3. Enable the required Google Cloud APIs, including Cloud Dataflow, Compute 
Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, and Cloud 
Resource Manager. Additional APIs may be necessary depending on your pipeline 
code.
+4. Authenticate with Google Cloud Platform.
+5. Install the Google Cloud SDK.
+6. Create a Cloud Storage bucket.
+
+***2. Specify Dependencies (Java Only):***
+
+When using the Apache Beam Java SDK, specify your dependency on the Cloud 
Dataflow Runner in the `pom.xml` file of your Java project directory.
+
+```java
+<dependency>
+  <groupId>org.apache.beam</groupId>
+  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+  <version>2.54.0</version>
+  <scope>runtime</scope>
+</dependency>
+```
+
+Ensure that you include all necessary dependencies to create a self-contained 
application. In some cases, such as when starting a pipeline using a scheduler, 
you'll need to package a self-executing JAR by explicitly adding a dependency 
in the Project section of your `pom.xml` file. For more details about running 
self-executing JARs on Cloud Dataflow, refer to the 'Self-executing JAR' 
section in the Apache Beam documentation on Cloud Dataflow Runner.
+
+***3. Configure Pipeline Options:***
+
+Configure the execution details, including the runner (set to `dataflow` or 
`DataflowRunner`), Cloud project ID, region, and streaming mode, using the 
`GoogleCloudOptions` interface for Python or the `DataflowPipelineOptions` 
interface for Java.
+
+You can utilize pipeline options to control various aspects of how Cloud 
Dataflow executes your job. For instance, you can specify whether your pipeline 
runs on worker virtual machines, on the Cloud Dataflow service backend, or 
locally. For additional pipeline configuration options, refer to the reference 
documentation for the respective interface.
+
+***4. Run Your Pipeline on Cloud Dataflow:***
+
+Execute your pipeline on Cloud Dataflow using the appropriate command for your 
SDK.
+
+The following example code, taken from the Cloud Dataflow quickstarts for Java 
and Python, shows how to run the WordCount example pipeline on Dataflow.
+
+For the Apache Beam Java SDK, in your terminal, run the following command 
(from your `word-count-beam` directory):
+
+```java
+  mvn -Pdataflow-runner compile exec:java \
+    -Dexec.mainClass=org.apache.beam.examples.WordCount \
+    -Dexec.args="--project=PROJECT_ID \
+    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
+    --output=gs://BUCKET_NAME/output \
+    --runner=DataflowRunner \
+    --region=REGION

Review Comment:
   ```suggestion
   mvn -Pdataflow-runner compile exec:java \
     -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--project=PROJECT_ID \
     --gcpTempLocation=gs://BUCKET_NAME/temp/ \
     --output=gs://BUCKET_NAME/output \
     --runner=DataflowRunner \
     --region=REGION
   ```



##########
learning/prompts/documentation-lookup-nolinks/19_io_pubsub.md:
##########
@@ -0,0 +1,20 @@
+Prompt:
+Is Pub/Sub supported in Apache Beam?
+
+Response:
+Pub/Sub is a Google Cloud service that provides a simple, reliable, scalable, 
and secure real-time messaging service for sending and receiving messages 
between independent applications. Apache Beam provides a PubSubIO connector 
that allows you to read and write messages from and to Pub/Sub. Pub/Sub is 
currently supported only in streaming pipelines.
+
+Pub/Sub is supported natively in the Beam SDKs for Java, Python, and Go. It is 
also available in Typescript through X Language.
+
+To get started with Pub/Sub and Apache Beam, the Dataflow Cookbook GitHub 
repository provides code examples for reading and writing data from and to 
Pub/Sub.
+
+Here is an example of the Apache Beam pipeline code for reading data from a 
Pub/Sub topic and writing it to another topic:
+
+```python
+from apache_beam.io import ReadFromPubSub
+from apache_beam.io import WriteToPubSub
+
+  with beam.Pipeline(options=options) as p:
+    (p | "Read from Pub/Sub" >> ReadFromPubSub(topic="input_topic")
+       | "Write to Pub/Sub" >> WriteToPubSub(topic="output_topic"))

Review Comment:
   ```suggestion
   from apache_beam.io import ReadFromPubSub
   from apache_beam.io import WriteToPubSub
   
   with beam.Pipeline(options=options) as p:
       (
           p
           | "Read from Pub/Sub" >> ReadFromPubSub(topic="input_topic")
           | "Write to Pub/Sub" >> WriteToPubSub(topic="output_topic")
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/21_io_bigtable.md:
##########
@@ -0,0 +1,24 @@
+Prompt:
+Is Cloud Bigtable supported in Apache Beam?
+
+Response:
+Cloud Bigtable is a Google Cloud service that provides enterprise-grade NoSQL 
database service with single-digit millisecond latency, limitless scale, and 
99.999% availability for large analytical and operational workloads.
+
+Apache Beam has a built-in connector for Cloud Bigtable that lets you read 
data from and write data to Bigtable. For more information, see the BigtableIO 
connector documentation for your preferred programming language.
+
+To get started with Cloud Bigtable and Apache Beam, refer to the Dataflow 
Cookbook GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data from a 
Bigtable table:
+
+```python
+with beam.Pipeline(options=options) as p:
+output = (p | "Read from Bigtable" >> ReadFromBigtable(
+                    project_id=project_id,
+                    instance_id=instance_id,
+                    table_id=table_id,
+                 )
+                    | "Extract cells" >> beam.Map(lambda row: row._cells)
+                    | "Log Data" >> Map(logging.info))

Review Comment:
   ```suggestion
   with beam.Pipeline(options=options) as p:
       output = (
           p
           | "Read from Bigtable"
           >> ReadFromBigtable(
               project_id=project_id,
               instance_id=instance_id,
               table_id=table_id,
           )
           | "Extract cells" >> beam.Map(lambda row: row._cells)
           | "Log Data" >> Map(logging.info)
       )
   ```



##########
learning/prompts/documentation-lookup-nolinks/29_advanced_side_inputs.md:
##########
@@ -0,0 +1,36 @@
+Prompt:
+What are side inputs in Apache Beam?
+
+Response:
+In Apache Beam, side inputs are additional inputs that can be provided to a 
`ParDo` transform, in addition to the main input `PCollection`. These side 
inputs can be used by the `ParDo` transform's `DoFn` to access and read 
additional data during the processing of each element in the main input 
`PCollection`.
+
+Side inputs are particularly useful when a Beam pipeline needs to ingest 
additional data dynamically at runtime when processing each element, rather 
than relying on preset or hard-coded values. By leveraging side inputs, Apache 
Beam can determine additional data based on the main `PCollection` data or even 
from a different branch of your pipeline. For example, you can leverage side 
inputs for your streaming analytics use cases, enriching streaming events with 
additional data retrieved from a lookup table.
+
+In the Apache Beam Python SDK, you can provide side inputs as extra arguments 
in the `DoFn`'s `process` method or the `Map`/`FlatMap` transforms. The Python 
SDK supports optional, positional, and keyword arguments.
+
+```python
+class MyDoFn(beam.DoFn):
+  def process(self, element, side_input):
+    ...
+```
+
+In the Apache Beam Java SDK, side inputs are provided to the `ParDo` transform 
by using the `.withSideInputs` method. To access a side input from within a 
`DoFn`, the `DoFn.ProcessContext.sideInput` method is invoked.
+
+```java
+PCollection<Integer> input = ...;
+PCollectionView<Integer> sideInput = ...;
+PCollection<Integer> output =
+    input.apply(ParDo.of(new DoFn<Integer, Integer>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        Integer sideInputValue = c.sideInput(sideInput);
+        ...
+      }
+    }).withSideInputs(sideInput));

Review Comment:
   ```suggestion
   PCollection<Integer> input = ...;
   PCollectionView<Integer> sideInput = ...;
   PCollection<Integer> output =
       input.apply(ParDo.of(new DoFn<Integer, Integer>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
               Integer sideInputValue = c.sideInput(sideInput);
               ...
           }
       }).withSideInputs(sideInput));
   ```



##########
learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md:
##########
@@ -0,0 +1,101 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra `PCollection` objects produced by a single 
`ParDo` transform in addition to the main output `PCollection`. The `ParDo` 
transform has the flexibility to produce any number of additional output 
`PCollection` objects and return them bundled together with the main output 
`PCollection`.
+
+Additional outputs serve as a mechanism to implement pipeline branching. You 
can use them when there is a need to split the output of a single transform 
into several `PCollection` objects or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the input `PCollection` just once.
+
+Producing additional outputs requires tagging each output `PCollection` with a 
unique identifier, which is then used to emit elements to the corresponding 
output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag` objects for each of the outputs, the 
tags are passed to the `ParDo` using the `.withOutputTags` method.
+
+The following Java code implements two additional output `PCollection` objects 
for string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));

Review Comment:
   ```suggestion
   // Input PCollection that contains strings.
   PCollection<String> input = ...;
   // Output tag for the main output PCollection of strings.
   final TupleTag<String> mainOutputTag = new TupleTag<String>() {};
   // Output tag for the additional output PCollection of strings.
   final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>() 
{};
   // Output tag for the additional output PCollection of integers.
   final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>() {};
   
   PCollectionTuple results = input.apply(ParDo
       .of(new DoFn<String, String>() {
           // DoFn continues here.
           ...
       })
       // Specify the tag for the main output.
       .withOutputTags(mainOutputTag,
           // Specify the tags for the two additional outputs as a TupleTagList.
           TupleTagList.of(additionalOutputTagString)
           .and(additionalOutputTagIntegers)));
   ```



##########
learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md:
##########
@@ -0,0 +1,101 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra `PCollection` objects produced by a single 
`ParDo` transform in addition to the main output `PCollection`. The `ParDo` 
transform has the flexibility to produce any number of additional output 
`PCollection` objects and return them bundled together with the main output 
`PCollection`.
+
+Additional outputs serve as a mechanism to implement pipeline branching. You 
can use them when there is a need to split the output of a single transform 
into several `PCollection` objects or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the input `PCollection` just once.
+
+Producing additional outputs requires tagging each output `PCollection` with a 
unique identifier, which is then used to emit elements to the corresponding 
output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag` objects for each of the outputs, the 
tags are passed to the `ParDo` using the `.withOutputTags` method.
+
+The following Java code implements two additional output `PCollection` objects 
for string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));
+```
+
+The `processElement` method can emit elements to the main output or any 
additional output by invoking the output method on the `MultiOutputReceiver` 
object. The output method takes the tag of the output and the element to be 
emitted as arguments.
+
+```java
+public void processElement(@Element String word, MultiOutputReceiver out) {
+       if (condition for main output) {
+         // Emit element to main output
+         out.get(mainOutputTag).output(word);
+       } else {
+         // Emit element to additional string output
+         out.get(additionalOutputTagString).output(word);
+       }
+       if (condition for additional integer output) {
+         // Emit element to additional integer output
+         out.get(additionalOutputTagIntegers).output(word.length());
+       }
+     }
+```
+
+In the Apache Beam Python SDK, you can implement additional outputs by 
invoking the `with_outputs()` method on the `ParDo` transform and specifying 
the expected tags for the multiple outputs.
+
+The following Python code demonstrates how to implement additional outputs for 
a `ParDo` transform that outputs two `PCollection` objects of strings and 
integers in addition to the main output `PCollection` of strings:
+
+```python
+class SplitLinesToWordsFn(beam.DoFn):
+
+  # These tags will be used to tag the outputs of this DoFn.
+  OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+
+  def process(self, element):
+    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged 
collection.
+    yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
+
+    words = re.findall(r'[A-Za-z\']+', element)
+    for word in words:
+      if len(word) <= 3:
+        # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged 
collection.
+        yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
+      else:
+        # yield word to add it to the main collection.
+        yield word

Review Comment:
   ```suggestion
   class SplitLinesToWordsFn(beam.DoFn):
       # These tags will be used to tag the outputs of this DoFn.
       OUTPUT_TAG_SHORT_WORDS = "tag_short_words"
       OUTPUT_TAG_CHARACTER_COUNT = "tag_character_count"
   
       def process(self, element):
           # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged 
collection.
           yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, 
len(element))
   
           words = re.findall(r"[A-Za-z\']+", element)
           for word in words:
               if len(word) <= 3:
                   # yield word as an output to the OUTPUT_TAG_SHORT_WORDS 
tagged collection.
                   yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
               else:
                   # yield word to add it to the main collection.
                   yield word
   ```



##########
learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md:
##########
@@ -0,0 +1,101 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra `PCollection` objects produced by a single 
`ParDo` transform in addition to the main output `PCollection`. The `ParDo` 
transform has the flexibility to produce any number of additional output 
`PCollection` objects and return them bundled together with the main output 
`PCollection`.
+
+Additional outputs serve as a mechanism to implement pipeline branching. You 
can use them when there is a need to split the output of a single transform 
into several `PCollection` objects or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the input `PCollection` just once.
+
+Producing additional outputs requires tagging each output `PCollection` with a 
unique identifier, which is then used to emit elements to the corresponding 
output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag` objects for each of the outputs, the 
tags are passed to the `ParDo` using the `.withOutputTags` method.
+
+The following Java code implements two additional output `PCollection` objects 
for string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));
+```
+
+The `processElement` method can emit elements to the main output or any 
additional output by invoking the output method on the `MultiOutputReceiver` 
object. The output method takes the tag of the output and the element to be 
emitted as arguments.
+
+```java
+public void processElement(@Element String word, MultiOutputReceiver out) {
+       if (condition for main output) {
+         // Emit element to main output
+         out.get(mainOutputTag).output(word);
+       } else {
+         // Emit element to additional string output
+         out.get(additionalOutputTagString).output(word);
+       }
+       if (condition for additional integer output) {
+         // Emit element to additional integer output
+         out.get(additionalOutputTagIntegers).output(word.length());
+       }
+     }

Review Comment:
   ```suggestion
   public void processElement(@Element String word, MultiOutputReceiver out) {
       if (condition
           for main output) {
           // Emit element to main output
           out.get(mainOutputTag).output(word);
       } else {
           // Emit element to additional string output
           out.get(additionalOutputTagString).output(word);
       }
       if (condition
           for additional integer output) {
           // Emit element to additional integer output
           out.get(additionalOutputTagIntegers).output(word.length());
       }
   }
   ```



##########
learning/prompts/documentation-lookup-nolinks/26_io_avro.md:
##########
@@ -0,0 +1,27 @@
+Prompt:
+Is Apache Avro format supported in Apache Beam?
+
+Response:
+Apache Avro is a data format for storing and exchanging data. Apache Beam 
supports reading from and writing to Avro files using the `ReadFromAvro` and 
`WriteToAvro` transforms in the `AvroIO` module. For more information, see the 
AvroIO connector documentation for your programming language of choice.
+
+To get started with Avro and Apache Beam, refer to the Dataflow Cookbook 
GitHub repository.
+
+Here is an example of Apache Beam pipeline code for reading data from an Avro 
file:
+
+```python
+class ReadAvroOptions(PipelineOptions):
+
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_argument(
+          "--path",
+          default="gs://cloud-samples-data/bigquery/us-states/*.avro",
+          help="GCS path to read from")
+
+  options = ReadAvroOptions()
+
+  with beam.Pipeline(options=options) as p:
+
+    (p | "Read from Avro" >> ReadFromAvro(options.path)
+       | Map(logging.info))

Review Comment:
   ```suggestion
   class ReadAvroOptions(PipelineOptions):
       @classmethod
       def _add_argparse_args(cls, parser):
           parser.add_argument(
               "--path",
               default="gs://cloud-samples-data/bigquery/us-states/*.avro",
               help="GCS path to read from",
           )
   
       options = ReadAvroOptions()
   
       with beam.Pipeline(options=options) as p:
           (p | "Read from Avro" >> ReadFromAvro(options.path) | 
Map(logging.info))
   ```



##########
learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md:
##########
@@ -0,0 +1,80 @@
+Prompt:
+How to execute Apache Beam pipelines with Cloud Dataflow?
+
+Response:
+Apache Beam supports multiple runners for executing pipelines on various 
platforms. One such runner is the Cloud Dataflow Runner, tailored for running 
pipelines on the Google Cloud Dataflow service. Cloud Dataflow offers fully 
managed and unified stream and batch data processing, boasting dynamic work 
rebalancing and built-in autoscaling capabilities.
+
+When you execute a pipeline on Cloud Dataflow, the Runner uploads your code 
and dependencies to a Cloud Storage bucket and creates a Dataflow job, which 
then executes your pipeline on managed resources within the Google Cloud 
Platform.
+
+To execute Apache Beam pipelines using the Cloud Dataflow Runner, follow these 
steps:
+
+***1. Setup Your Cloud Project and Resources:***
+
+Complete the steps outlined in the 'Before You Begin' section of the Cloud 
Dataflow quickstart for your chosen programming language:
+1. Select or create a Google Cloud Platform Console project.
+2. Enable billing for your project.
+3. Enable the required Google Cloud APIs, including Cloud Dataflow, Compute 
Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, and Cloud 
Resource Manager. Additional APIs may be necessary depending on your pipeline 
code.
+4. Authenticate with Google Cloud Platform.
+5. Install the Google Cloud SDK.
+6. Create a Cloud Storage bucket.
+
+***2. Specify Dependencies (Java Only):***
+
+When using the Apache Beam Java SDK, specify your dependency on the Cloud 
Dataflow Runner in the `pom.xml` file of your Java project directory.
+
+```java
+<dependency>
+  <groupId>org.apache.beam</groupId>
+  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+  <version>2.54.0</version>
+  <scope>runtime</scope>
+</dependency>
+```
+
+Ensure that you include all necessary dependencies to create a self-contained 
application. In some cases, such as when starting a pipeline using a scheduler, 
you'll need to package a self-executing JAR by explicitly adding a dependency 
in the Project section of your `pom.xml` file. For more details about running 
self-executing JARs on Cloud Dataflow, refer to the 'Self-executing JAR' 
section in the Apache Beam documentation on Cloud Dataflow Runner.
+
+***3. Configure Pipeline Options:***
+
+Configure the execution details, including the runner (set to `dataflow` or 
`DataflowRunner`), Cloud project ID, region, and streaming mode, using the 
`GoogleCloudOptions` interface for Python or the `DataflowPipelineOptions` 
interface for Java.
+
+You can utilize pipeline options to control various aspects of how Cloud 
Dataflow executes your job. For instance, you can specify whether your pipeline 
runs on worker virtual machines, on the Cloud Dataflow service backend, or 
locally. For additional pipeline configuration options, refer to the reference 
documentation for the respective interface.
+
+***4. Run Your Pipeline on Cloud Dataflow:***
+
+Execute your pipeline on Cloud Dataflow using the appropriate command for your 
SDK.
+
+The following example code, taken from the Cloud Dataflow quickstarts for Java 
and Python, shows how to run the WordCount example pipeline on Dataflow.
+
+For the Apache Beam Java SDK, in your terminal, run the following command 
(from your `word-count-beam` directory):
+
+```java
+  mvn -Pdataflow-runner compile exec:java \
+    -Dexec.mainClass=org.apache.beam.examples.WordCount \
+    -Dexec.args="--project=PROJECT_ID \
+    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
+    --output=gs://BUCKET_NAME/output \
+    --runner=DataflowRunner \
+    --region=REGION
+   ```

Review Comment:
   nit: remove extra spaces



##########
learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md:
##########
@@ -0,0 +1,101 @@
+Prompt:
+What are additional outputs in Apache Beam?
+
+Response:
+In Apache Beam, additional outputs, also known as tagged or side outputs, 
refer to one or multiple extra `PCollection` objects produced by a single 
`ParDo` transform in addition to the main output `PCollection`. The `ParDo` 
transform has the flexibility to produce any number of additional output 
`PCollection` objects and return them bundled together with the main output 
`PCollection`.
+
+Additional outputs serve as a mechanism to implement pipeline branching. You 
can use them when there is a need to split the output of a single transform 
into several `PCollection` objects or produce outputs in different formats. 
Additional outputs become particularly beneficial when a transform’s 
computation per element is time-consuming because they enable transforms to 
process each element in the input `PCollection` just once.
+
+Producing additional outputs requires tagging each output `PCollection` with a 
unique identifier, which is then used to emit elements to the corresponding 
output.
+
+In the Apache Beam Java SDK, you can implement additional outputs by creating 
a `TupleTag` object to identify each collection produced by the `ParDo` 
transform. After specifying the `TupleTag` objects for each of the outputs, the 
tags are passed to the `ParDo` using the `.withOutputTags` method.
+
+The following Java code implements two additional output `PCollection` objects 
for string and integer values in addition to the main output `PCollection` of 
strings:
+
+```java
+// Input PCollection that contains strings.
+  PCollection<String> input = ...;
+// Output tag for the main output PCollection of strings.
+final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
+// Output tag for the additional output PCollection of strings.
+final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
+// Output tag for the additional output PCollection of integers.
+final TupleTag<Integer> additionalOutputTagIntegers = new 
TupleTag<Integer>(){};
+
+PCollectionTuple results = input.apply(ParDo
+          .of(new DoFn<String, String>() {
+            // DoFn continues here.
+            ...
+          })
+          // Specify the tag for the main output.
+          .withOutputTags(mainOutputTag,
+          // Specify the tags for the two additional outputs as a TupleTagList.
+                          TupleTagList.of(additionalOutputTagString)
+                                      .and(additionalOutputTagIntegers)));
+```
+
+The `processElement` method can emit elements to the main output or any 
additional output by invoking the output method on the `MultiOutputReceiver` 
object. The output method takes the tag of the output and the element to be 
emitted as arguments.
+
+```java
+public void processElement(@Element String word, MultiOutputReceiver out) {
+       if (condition for main output) {
+         // Emit element to main output
+         out.get(mainOutputTag).output(word);
+       } else {
+         // Emit element to additional string output
+         out.get(additionalOutputTagString).output(word);
+       }
+       if (condition for additional integer output) {
+         // Emit element to additional integer output
+         out.get(additionalOutputTagIntegers).output(word.length());
+       }
+     }
+```
+
+In the Apache Beam Python SDK, you can implement additional outputs by 
invoking the `with_outputs()` method on the `ParDo` transform and specifying 
the expected tags for the multiple outputs.
+
+The following Python code demonstrates how to implement additional outputs for 
a `ParDo` transform that outputs two `PCollection` objects of strings and 
integers in addition to the main output `PCollection` of strings:
+
+```python
+class SplitLinesToWordsFn(beam.DoFn):
+
+  # These tags will be used to tag the outputs of this DoFn.
+  OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
+  OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
+
+  def process(self, element):
+    # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged 
collection.
+    yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
+
+    words = re.findall(r'[A-Za-z\']+', element)
+    for word in words:
+      if len(word) <= 3:
+        # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged 
collection.
+        yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
+      else:
+        # yield word to add it to the main collection.
+        yield word
+```
+
+The method returns a `DoOutputsTuple` object, with the specified tags serving 
as attributes that provide `ParDo` with access to the corresponding output 
`PCollection` objects.
+
+```python
+with beam.Pipeline(options=pipeline_options) as p:
+
+    lines = p | ReadFromText(known_args.input)
+
+    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+    split_lines_result = (
+        lines
+        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+            main='words'))
+
+    # split_lines_result is an object of type DoOutputsTuple
+    words, _, _ = split_lines_result
+    short_words = 
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+    character_count = split_lines_result.tag_character_count

Review Comment:
   ```suggestion
   with beam.Pipeline(options=pipeline_options) as p:
       lines = p | ReadFromText(known_args.input)
   
       # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
       split_lines_result = lines | 
beam.ParDo(SplitLinesToWordsFn()).with_outputs(
           SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
           SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
           main="words",
       )
   
       # split_lines_result is an object of type DoOutputsTuple
       words, _, _ = split_lines_result
       short_words = 
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
       character_count = split_lines_result.tag_character_count
   ```



##########
learning/prompts/documentation-lookup-nolinks/54_spark_runner.md:
##########
@@ -0,0 +1,148 @@
+Prompt:
+How to execute Apache Beam pipelines with Apache Spark?
+
+Response:
+Apache Beam supports multiple runners for executing pipelines on various 
platforms. One such runner is the Spark Runner, tailored for running pipelines 
using Apache Spark. The Spark Runner enables you to leverage the scalability 
and parallel processing capabilities of Apache Spark for your data processing 
tasks.
+
+Key features of the Spark Runner include:
+* Support for batch, streaming, and unified pipelines.
+* Offering the same fault-tolerance guarantees and security features as 
provided by Apache Spark.
+* Built-in metrics reporting using Apache Spark’s metrics system, which also 
reports Beam Aggregators.
+* Native support for Apache Beam side inputs via Apache Spark’s broadcast 
variables.
+
+There are three types of Spark Runners available:
+1. Legacy Spark Runner: supports Java (and other JVM-based languages) 
exclusively, based on Apache Spark’s RDD and DStream.
+2. Structured Streaming Spark Runner: supports Java (and other JVM-based 
languages) exclusively, based on Apache Spark's Datasets and Structured 
Streaming framework. Currently, it only supports batch mode with limited 
coverage of the Apache Beam model.
+3. Portable Spark Runner: supports Java, Python, and Go.
+
+For Java-based applications, consider using the Java-based runners, while for 
Python or Go pipelines, opt for the portable Runner.
+
+The Spark Runner can execute Spark pipelines similar to a native Spark 
application, allowing deployment as a self-contained application for local 
mode, running on Spark Standalone Resource Manager (RM), or using YARN or Mesos.
+
+To execute your Apache Beam pipeline on a Spark Standalone RM, follow these 
steps:
+
+***Java-based Non-portable Spark Runners (Java Only)***
+
+***1. Specify Dependencies:***
+
+In the `pom.xml` file of your Java project directory, specify your dependency 
on the latest version of the Spark Runner:
+
+```java
+<dependency>
+  <groupId>org.apache.beam</groupId>
+  <artifactId>beam-runners-spark-3</artifactId>
+  <version>2.54.0</version>
+</dependency>
+```
+
+***2. Deploy Spark with Your Application:***
+
+When running pipelines in a Spark Standalone mode, ensure that your 
self-contained application includes Spark dependencies explicitly in your 
`pom.xml` file:
+
+```java
+<dependency>
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-core_2.12</artifactId>
+  <version>${spark.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming_2.12</artifactId>
+  <version>${spark.version}</version>
+</dependency>
+```
+
+Shade the application JAR using the Maven shade plugin and make sure the 
shaded JAR file is visible in the target directory by running `is target`.
+
+To run pipelines in a Spark Standalone mode using the legacy RDD/DStream-based 
Spark Runner, use the following command:
+
+```java
+spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT 
target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner
+```
+
+To run pipelines in a Spark Standalone mode using the Structured Streaming 
Spark Runner, run the following command:
+
+```java
+spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT 
target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner
+```
+
+***3. Configure Pipeline Options:***
+
+Set the runner option in your pipeline options to specify that you want to use 
the Spark Runner. In Java, you can do this as follows:
+
+```java
+SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+options.setRunner(SparkRunner.class);
+```
+
+For additional pipeline configuration options, refer to the Spark Runner 
documentation.
+
+***4. Run Your Pipeline:***
+
+In Java, you can use the `PipelineRunner` to run your pipeline:
+
+```java
+Pipeline p = Pipeline.create(options);
+// Add transforms to your pipeline
+p.run();
+```
+
+***5. Monitor Your Job:***
+
+Monitor the execution of your pipeline using the Apache Spark Web Interfaces, 
which provides information about tasks, stages, and overall progress. Access 
the Spark UI by navigating to the appropriate URL (usually `localhost:4040`). 
Metrics are also accessible via the Apache Beam REST API. Apache Spark offers a 
metrics system for reporting metrics to various sinks.
+
+***Portable Spark Runner (Python)***
+
+***1. Deploy Spark with Your Application:***
+
+You will need Docker installed in your execution environment. Pre-built Spark 
Job Service Docker images are available on Docker Hub.
+
+Start the JobService endpoint:
+
+```python
+docker run --net=host apache/beam_spark_job_server:latest
+```
+A Beam JobService is a central instance where you submit your Apache Beam 
pipeline. It needs to be provided with the Spark master address to create a job 
for execution on your Spark cluster.
+
+Submit the Python pipeline to this endpoint, providing Beam JobService with 
the Spark master address to execute the job on a Spark cluster:
+
+```python
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+
+options = PipelineOptions([
+    "--runner=PortableRunner",
+    "--job_endpoint=localhost:8099", ​​# localhost:8099 is the default address 
of the JobService
+    "--environment_type=LOOPBACK"
+])
+with beam.Pipeline(options) as p:
+    ...
+```
+
+***2. Configure Pipeline Options:***
+
+Set the runner option in your pipeline options to specify that you want to use 
the Spark Runner. In Python, you can do this as follows:
+
+```python
+from apache_beam.options.pipeline_options import PipelineOptions
+options = PipelineOptions()
+options.view_as(SparkRunnerOptions).runner = 'SparkRunner'
+```
+
+For additional pipeline configuration options, refer to the Spark Runner 
documentation.
+
+***3. Run Your Pipeline:***
+
+In Python, you can use the `run()` method of your pipeline object to execute 
the pipeline:
+
+```python
+    # Run your pipeline
+    p.run()

Review Comment:
   ```suggestion
   # Run your pipeline
   p.run()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to