andreydevyatkin commented on code in PR #30873: URL: https://github.com/apache/beam/pull/30873#discussion_r1555991358
########## learning/prompts/documentation-lookup-nolinks/09_basic_triggers.md: ########## @@ -0,0 +1,30 @@ +Prompt: +What is a trigger in Apache Beam? + +Response: +Apache Beam uses triggers to determine when to emit the aggregated results of each window, referred to as a pane. + +Triggers provide two additional capabilities compared to outputting at the end of a window: +- Allowing early results to be output before the end of the window. +- Allowing late data to be handled after the end of the window. + +This feature lets you control the flow of your data and balance between completeness, latency, and cost. + +You set the triggers for a `PCollection` by setting the `trigger` parameter of the `WindowInto` transform. + +```python + pcollection | WindowInto( + FixedWindows(1 * 60), + trigger=AfterProcessingTime(1 * 60), + accumulation_mode=AccumulationMode.DISCARDING) Review Comment: ```suggestion pcollection | WindowInto( FixedWindows(1 * 60), trigger=AfterProcessingTime(1 * 60), accumulation_mode=AccumulationMode.DISCARDING, ) ``` ########## learning/prompts/documentation-lookup-nolinks/25_io_csv.md: ########## @@ -0,0 +1,29 @@ +Prompt: +Is CSV format supported in Apache Beam? + +Response: +CSV is a data format for storing and exchanging data. Apache Beam supports reading data from and writing data to CSV files using the `ReadFromText` and `WriteToText` transforms in the `TextIO` module. For more information, see the TextIO connector documentation for your programming language of choice. + +To get started with CSV and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from a CSV file: + +```python +class CsvOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--file_path', + default="gs://your-bucket/your-file.csv", + help='Csv file path' + ) + +options = CsvOptions() + +with beam.Pipeline(options=options) as p: + output = (p | "Read from Csv file" >> ReadFromCsv( + path=options.file_path + ) + | "Log Data" >> Map(logging.info)) + Review Comment: ```suggestion class CsvOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( "--file_path", default="gs://your-bucket/your-file.csv", help="Csv file path", ) options = CsvOptions() with beam.Pipeline(options=options) as p: output = ( p | "Read from Csv file" >> ReadFromCsv(path=options.file_path) | "Log Data" >> Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/20_io_biguery.md: ########## @@ -0,0 +1,41 @@ +Prompt: +Is BigQuery supported in Apache Beam? + +Response: +Yes, Apache Beam supports BigQuery. BigQuery is a serverless and cost-effective enterprise data warehouse offered by Google Cloud. Apache Beam provides a BigQueryIO connector to read and write data from and to BigQuery. The BigQueryIO connector supports both batch and streaming pipelines. + +The following Apache Beam SDKs support the BigQueryIO connector: +* Java (natively) +* Python (natively) +* Go (natively and through X Language) +* Typescript (through X Language) + +To read data from BigQuery, use the `ReadFromBigQuery` function. Apache Beam can read data directly from a BigQuery table or using an SQL query. The default mode is to return table rows read from a BigQuery source as dictionaries. Built-in `TableRow` objects can also be returned. + +Here is an example of Apache Beam pipeline code for reading from BigQuery: + +```python +from apache_beam.io.gcp.bigquery import ReadFromBigQuery + +with beam.Pipeline(options=options) as p: + # read from a table + lines_table = p | 'Read' >> ReadFromBigQuery(table=table) + # read from a query + lines_query = p | 'Read' >> ReadFromBigQuery(query="SELECT * FROM table") +``` + +Here is an example of Apache Beam pipeline code for writing to BigQuery: + +```python +from apache_beam.io.gcp.bigquery import WriteToBigQuery + +with beam.Pipeline(options=options) as p: + # write to a table + p | 'Write' >> beam.io.WriteToBigQuery( + table, + schema=TABLE_SCHEMA, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) Review Comment: ```suggestion from apache_beam.io.gcp.bigquery import WriteToBigQuery with beam.Pipeline(options=options) as p: # write to a table p | "Write" >> beam.io.WriteToBigQuery( table, schema=TABLE_SCHEMA, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, ) ``` ########## learning/prompts/documentation-lookup-nolinks/18_io_kafka.md: ########## @@ -0,0 +1,24 @@ +Prompt: +Is Apache Kafka supported in Apache Beam? + +Response: +Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, and fast. + +Apache Beam has a built-in KafkaIO connector for Apache Kafka that enables Apache Beam pipelines to read data from and write data to Kafka. Apache Kafka is supported in both batch and streaming pipelines and can serve as both a source and a sink. For more information, refer to the KafkaIO connector documentation for your preferred programming language. + +To get started with Apache Kafka and Apache Beam, refer to the Dataflow Cookbook repository. + +Here is an example of Apache Beam pipeline code for reading data from a Kafka topic: + +```python +with beam.Pipeline(options=options) as p: + output = (p | "Read from Kafka" >> ReadFromKafka( + consumer_config={'bootstrap.servers': 'localhost:9092'}, + topics=['my_topic'], + with_metadata=False) + | "Log Data" >> Map(logging.info)) Review Comment: ```suggestion with beam.Pipeline(options=options) as p: output = ( p | "Read from Kafka" >> ReadFromKafka( consumer_config={"bootstrap.servers": "localhost:9092"}, topics=["my_topic"], with_metadata=False, ) | "Log Data" >> Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/28_io_jdbc.md: ########## @@ -0,0 +1,48 @@ +Prompt: +Is Oracle Java Database Connectivity (JDBC) supported in Apache Beam? + +Response: +The Oracle Java Database Connectivity (JDBC) API is a Java API for connecting to relational databases. Apache Beam supports reading and writing data with JDBC using the `ReadFromJdbc` and `WriteToJdbc` transforms in the `JdbcIO` module. The JdbcIO connector is native to Java and available through cross-language transforms in Python and Go. + +To get started with JDBC and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data with JDBC: + +```python +class ExampleRow(NamedTuple): + id: int + name: str + +class ReadJdbcOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + "--driverClassName", + help="JDBC driver class name") + parser.add_argument( + "--url", + help="JDBC URL") + parser.add_argument( + "--username", + help="JDBC username") + parser.add_argument( + "--password", + help="JDBC password") + parser.add_argument( + "--query", + default="SELECT * FROM users", + help="JDBC query") + +options = ReadJdbcOptions() + +with beam.Pipeline(options=options) as p: + + (p | "Read from JDBC" >> ReadFromJdbc( + driverClassName=options.driverClassName, + url=options.url, + username=options.username, + password=options.password, + query=options.query) + | Map(logging.info)) Review Comment: ```suggestion class ExampleRow(NamedTuple): id: int name: str class ReadJdbcOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument("--driverClassName", help="JDBC driver class name") parser.add_argument("--url", help="JDBC URL") parser.add_argument("--username", help="JDBC username") parser.add_argument("--password", help="JDBC password") parser.add_argument("--query", default="SELECT * FROM users", help="JDBC query") options = ReadJdbcOptions() with beam.Pipeline(options=options) as p: ( p | "Read from JDBC" >> ReadFromJdbc( driverClassName=options.driverClassName, url=options.url, username=options.username, password=options.password, query=options.query, ) | Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/27_io_parquet.md: ########## @@ -0,0 +1,27 @@ +Prompt: +Is Apache Parquet format supported in Apache Beam? + +Response: +Apache Parquet is a data format for storing and exchanging data. Apache Beam supports reading from and writing to Parquet files using the `ReadFromParquet` and `WriteToParquet` transforms in the `ParquetIO` module. For more information, see the ParquetIO connector documentation for your programming language of choice. + +To get started with Avro and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from a Parquet file: + +```python +class ReadParquetOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + "--path", + help="GCS path to parquet file") + + options = ReadParquetOptions() + + with beam.Pipeline(options=options) as p: + + (p | "ReadParquet" >> ReadFromParquet(options.path) + | Map(logging.info)) + Review Comment: ```suggestion class ReadParquetOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument("--path", help="GCS path to parquet file") options = ReadParquetOptions() with beam.Pipeline(options=options) as p: (p | "ReadParquet" >> ReadFromParquet(options.path) | Map(logging.info)) ``` ########## learning/prompts/documentation-lookup-nolinks/22_io_spanner.md: ########## @@ -0,0 +1,28 @@ +Prompt: +Is Cloud Spanner supported in Apache Beam? + +Response: +Cloud Spanner is a Google Cloud service that provides a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability. + +Apache Beam has a built-in SpannerIO connector for Cloud Spanner that enables Apache Beam pipelines to read data from and write data to Cloud Spanner. Cloud Spanner is supported in both batch pipelines and streaming pipelines and can serve both as a source and a sink. For more information, see the SpannerIO connector documentation for Java, Python, or Go. + +To get started with Cloud Spanner and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from a Cloud Spanner table using an SQL query: + +```python +class ExampleRow(NamedTuple): + id: int + name: str + +with beam.Pipeline(options=options) as p: + output = (p | "Read from table" >> ReadFromSpanner( + project_id=options.project_id, + instance_id=options.instance_id, + database_id=options.database_id, + row_type=ExampleRow, + sql="SELECT * FROM example_row" + ) + | "Map Data" >> Map(lambda row: f"Id = {row.id}, Name = {row.name}") + | "Log Data" >> Map(logging.info)) Review Comment: ```suggestion class ExampleRow(NamedTuple): id: int name: str with beam.Pipeline(options=options) as p: output = ( p | "Read from table" >> ReadFromSpanner( project_id=options.project_id, instance_id=options.instance_id, database_id=options.database_id, row_type=ExampleRow, sql="SELECT * FROM example_row", ) | "Map Data" >> Map(lambda row: f"Id = {row.id}, Name = {row.name}") | "Log Data" >> Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/20_io_biguery.md: ########## @@ -0,0 +1,41 @@ +Prompt: +Is BigQuery supported in Apache Beam? + +Response: +Yes, Apache Beam supports BigQuery. BigQuery is a serverless and cost-effective enterprise data warehouse offered by Google Cloud. Apache Beam provides a BigQueryIO connector to read and write data from and to BigQuery. The BigQueryIO connector supports both batch and streaming pipelines. + +The following Apache Beam SDKs support the BigQueryIO connector: +* Java (natively) +* Python (natively) +* Go (natively and through X Language) +* Typescript (through X Language) + +To read data from BigQuery, use the `ReadFromBigQuery` function. Apache Beam can read data directly from a BigQuery table or using an SQL query. The default mode is to return table rows read from a BigQuery source as dictionaries. Built-in `TableRow` objects can also be returned. + +Here is an example of Apache Beam pipeline code for reading from BigQuery: + +```python +from apache_beam.io.gcp.bigquery import ReadFromBigQuery + +with beam.Pipeline(options=options) as p: + # read from a table + lines_table = p | 'Read' >> ReadFromBigQuery(table=table) + # read from a query + lines_query = p | 'Read' >> ReadFromBigQuery(query="SELECT * FROM table") Review Comment: ```suggestion from apache_beam.io.gcp.bigquery import ReadFromBigQuery with beam.Pipeline(options=options) as p: # read from a table lines_table = p | "Read" >> ReadFromBigQuery(table=table) # read from a query lines_query = p | "Read" >> ReadFromBigQuery(query="SELECT * FROM table") ``` ########## learning/prompts/documentation-lookup-nolinks/08_basic_windowing.md: ########## @@ -0,0 +1,31 @@ +Prompt: +What is windowing in Apache Beam? + +Response: +In stream processing, windowing allows you to divide data streams into logical units for efficient and correct parallel processing. + +With an unbounded data set, collecting all of the elements is impossible, because new elements are constantly being added. In the Apache Beam model, any `PCollection` (including unbounded `PCollection` objects) can be subdivided into logical windows. Grouping transforms then consider the elements of each `PCollection` on a per-window basis. + +Because Apache Beam's default windowing strategy is to assign each element to a single, global window, you must explicitly specify a windowing function for your pipeline. + +The following code snippet shows how to divide a `PCollection` into 60-second windows: + +```python +from apache_beam import beam +from apache_beam import window +fixed_windowed_items = ( + items | 'window' >> beam.WindowInto(window.FixedWindows(60))) Review Comment: ```suggestion from apache_beam import beam from apache_beam import window fixed_windowed_items = items | 'window' >> beam.WindowInto(window.FixedWindows(60)) ``` ########## learning/prompts/documentation-lookup-nolinks/07_basic_runners.md: ########## @@ -0,0 +1,25 @@ +Prompt: +What is a runner in Apache Beam? + +Response: +Apache Beam runners are the execution engines that run the pipelines. They translate or adapt the pipeline into a form that can be executed on a massively parallel big data processing system, such as Apache Flink, Apache Spark, Google Cloud Dataflow, and more. + +Choosing a runner is an important step in the pipeline development process. The runner you choose determines where and how your pipeline runs. For more information about available runners and their capabilities, see the Beam Capability Matrix. + +To specify a runner, when you execute the pipeline, use the `--runner` flag. For example, to run the WordCount pipeline on Google Cloud Dataflow, you would run the following command: + +```python +python -m apache_beam.examples.wordcount \ + --region DATAFLOW_REGION \ + --input gs://dataflow-samples/shakespeare/kinglear.txt \ + --output gs://STORAGE_BUCKET/results/outputs \ + --runner DataflowRunner \ + --project PROJECT_ID \ + --temp_location gs://STORAGE_BUCKET/tmp/ + ``` Review Comment: nit: extra spaces need to be removed ########## learning/prompts/documentation-lookup-nolinks/24_io_json.md: ########## @@ -0,0 +1,28 @@ +Prompt: +Is JSON supported in Apache Beam? + +Response: +JSON is a data format for storing and exchanging data. Apache Beam supports reading data from and writing data to JSON files using the `ReadFromJson` and `WriteToJson` transforms within the `TextIO` module. For more information, see the TextIO connector documentation for your programming language of choice. + +To get started with JSON and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from a JSON file: + +```python +class JsonOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + '--file_path', + default="gs://your-bucket/your-file.json", + help='Json file path' + ) + +options = JsonOptions() +with beam.Pipeline(options=options) as p: + output = (p | "Read from Json file" >> ReadFromJson( + path=options.file_path, + lines=False + ) + | "Log Data" >> Map(logging.info)) Review Comment: ```suggestion class JsonOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( "--file_path", default="gs://your-bucket/your-file.json", help="Json file path", ) options = JsonOptions() with beam.Pipeline(options=options) as p: output = ( p | "Read from Json file" >> ReadFromJson(path=options.file_path, lines=False) | "Log Data" >> Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md: ########## @@ -0,0 +1,80 @@ +Prompt: +How to execute Apache Beam pipelines with Cloud Dataflow? + +Response: +Apache Beam supports multiple runners for executing pipelines on various platforms. One such runner is the Cloud Dataflow Runner, tailored for running pipelines on the Google Cloud Dataflow service. Cloud Dataflow offers fully managed and unified stream and batch data processing, boasting dynamic work rebalancing and built-in autoscaling capabilities. + +When you execute a pipeline on Cloud Dataflow, the Runner uploads your code and dependencies to a Cloud Storage bucket and creates a Dataflow job, which then executes your pipeline on managed resources within the Google Cloud Platform. + +To execute Apache Beam pipelines using the Cloud Dataflow Runner, follow these steps: + +***1. Setup Your Cloud Project and Resources:*** + +Complete the steps outlined in the 'Before You Begin' section of the Cloud Dataflow quickstart for your chosen programming language: +1. Select or create a Google Cloud Platform Console project. +2. Enable billing for your project. +3. Enable the required Google Cloud APIs, including Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, and Cloud Resource Manager. Additional APIs may be necessary depending on your pipeline code. +4. Authenticate with Google Cloud Platform. +5. Install the Google Cloud SDK. +6. Create a Cloud Storage bucket. + +***2. Specify Dependencies (Java Only):*** + +When using the Apache Beam Java SDK, specify your dependency on the Cloud Dataflow Runner in the `pom.xml` file of your Java project directory. + +```java +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <version>2.54.0</version> + <scope>runtime</scope> +</dependency> +``` + +Ensure that you include all necessary dependencies to create a self-contained application. In some cases, such as when starting a pipeline using a scheduler, you'll need to package a self-executing JAR by explicitly adding a dependency in the Project section of your `pom.xml` file. For more details about running self-executing JARs on Cloud Dataflow, refer to the 'Self-executing JAR' section in the Apache Beam documentation on Cloud Dataflow Runner. + +***3. Configure Pipeline Options:*** + +Configure the execution details, including the runner (set to `dataflow` or `DataflowRunner`), Cloud project ID, region, and streaming mode, using the `GoogleCloudOptions` interface for Python or the `DataflowPipelineOptions` interface for Java. + +You can utilize pipeline options to control various aspects of how Cloud Dataflow executes your job. For instance, you can specify whether your pipeline runs on worker virtual machines, on the Cloud Dataflow service backend, or locally. For additional pipeline configuration options, refer to the reference documentation for the respective interface. + +***4. Run Your Pipeline on Cloud Dataflow:*** + +Execute your pipeline on Cloud Dataflow using the appropriate command for your SDK. + +The following example code, taken from the Cloud Dataflow quickstarts for Java and Python, shows how to run the WordCount example pipeline on Dataflow. + +For the Apache Beam Java SDK, in your terminal, run the following command (from your `word-count-beam` directory): + +```java + mvn -Pdataflow-runner compile exec:java \ + -Dexec.mainClass=org.apache.beam.examples.WordCount \ + -Dexec.args="--project=PROJECT_ID \ + --gcpTempLocation=gs://BUCKET_NAME/temp/ \ + --output=gs://BUCKET_NAME/output \ + --runner=DataflowRunner \ + --region=REGION Review Comment: ```suggestion mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION ``` ########## learning/prompts/documentation-lookup-nolinks/19_io_pubsub.md: ########## @@ -0,0 +1,20 @@ +Prompt: +Is Pub/Sub supported in Apache Beam? + +Response: +Pub/Sub is a Google Cloud service that provides a simple, reliable, scalable, and secure real-time messaging service for sending and receiving messages between independent applications. Apache Beam provides a PubSubIO connector that allows you to read and write messages from and to Pub/Sub. Pub/Sub is currently supported only in streaming pipelines. + +Pub/Sub is supported natively in the Beam SDKs for Java, Python, and Go. It is also available in Typescript through X Language. + +To get started with Pub/Sub and Apache Beam, the Dataflow Cookbook GitHub repository provides code examples for reading and writing data from and to Pub/Sub. + +Here is an example of the Apache Beam pipeline code for reading data from a Pub/Sub topic and writing it to another topic: + +```python +from apache_beam.io import ReadFromPubSub +from apache_beam.io import WriteToPubSub + + with beam.Pipeline(options=options) as p: + (p | "Read from Pub/Sub" >> ReadFromPubSub(topic="input_topic") + | "Write to Pub/Sub" >> WriteToPubSub(topic="output_topic")) Review Comment: ```suggestion from apache_beam.io import ReadFromPubSub from apache_beam.io import WriteToPubSub with beam.Pipeline(options=options) as p: ( p | "Read from Pub/Sub" >> ReadFromPubSub(topic="input_topic") | "Write to Pub/Sub" >> WriteToPubSub(topic="output_topic") ) ``` ########## learning/prompts/documentation-lookup-nolinks/21_io_bigtable.md: ########## @@ -0,0 +1,24 @@ +Prompt: +Is Cloud Bigtable supported in Apache Beam? + +Response: +Cloud Bigtable is a Google Cloud service that provides enterprise-grade NoSQL database service with single-digit millisecond latency, limitless scale, and 99.999% availability for large analytical and operational workloads. + +Apache Beam has a built-in connector for Cloud Bigtable that lets you read data from and write data to Bigtable. For more information, see the BigtableIO connector documentation for your preferred programming language. + +To get started with Cloud Bigtable and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from a Bigtable table: + +```python +with beam.Pipeline(options=options) as p: +output = (p | "Read from Bigtable" >> ReadFromBigtable( + project_id=project_id, + instance_id=instance_id, + table_id=table_id, + ) + | "Extract cells" >> beam.Map(lambda row: row._cells) + | "Log Data" >> Map(logging.info)) Review Comment: ```suggestion with beam.Pipeline(options=options) as p: output = ( p | "Read from Bigtable" >> ReadFromBigtable( project_id=project_id, instance_id=instance_id, table_id=table_id, ) | "Extract cells" >> beam.Map(lambda row: row._cells) | "Log Data" >> Map(logging.info) ) ``` ########## learning/prompts/documentation-lookup-nolinks/29_advanced_side_inputs.md: ########## @@ -0,0 +1,36 @@ +Prompt: +What are side inputs in Apache Beam? + +Response: +In Apache Beam, side inputs are additional inputs that can be provided to a `ParDo` transform, in addition to the main input `PCollection`. These side inputs can be used by the `ParDo` transform's `DoFn` to access and read additional data during the processing of each element in the main input `PCollection`. + +Side inputs are particularly useful when a Beam pipeline needs to ingest additional data dynamically at runtime when processing each element, rather than relying on preset or hard-coded values. By leveraging side inputs, Apache Beam can determine additional data based on the main `PCollection` data or even from a different branch of your pipeline. For example, you can leverage side inputs for your streaming analytics use cases, enriching streaming events with additional data retrieved from a lookup table. + +In the Apache Beam Python SDK, you can provide side inputs as extra arguments in the `DoFn`'s `process` method or the `Map`/`FlatMap` transforms. The Python SDK supports optional, positional, and keyword arguments. + +```python +class MyDoFn(beam.DoFn): + def process(self, element, side_input): + ... +``` + +In the Apache Beam Java SDK, side inputs are provided to the `ParDo` transform by using the `.withSideInputs` method. To access a side input from within a `DoFn`, the `DoFn.ProcessContext.sideInput` method is invoked. + +```java +PCollection<Integer> input = ...; +PCollectionView<Integer> sideInput = ...; +PCollection<Integer> output = + input.apply(ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { + Integer sideInputValue = c.sideInput(sideInput); + ... + } + }).withSideInputs(sideInput)); Review Comment: ```suggestion PCollection<Integer> input = ...; PCollectionView<Integer> sideInput = ...; PCollection<Integer> output = input.apply(ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { Integer sideInputValue = c.sideInput(sideInput); ... } }).withSideInputs(sideInput)); ``` ########## learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md: ########## @@ -0,0 +1,101 @@ +Prompt: +What are additional outputs in Apache Beam? + +Response: +In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra `PCollection` objects produced by a single `ParDo` transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection` objects and return them bundled together with the main output `PCollection`. + +Additional outputs serve as a mechanism to implement pipeline branching. You can use them when there is a need to split the output of a single transform into several `PCollection` objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once. + +Producing additional outputs requires tagging each output `PCollection` with a unique identifier, which is then used to emit elements to the corresponding output. + +In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag` objects for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. + +The following Java code implements two additional output `PCollection` objects for string and integer values in addition to the main output `PCollection` of strings: + +```java +// Input PCollection that contains strings. + PCollection<String> input = ...; +// Output tag for the main output PCollection of strings. +final TupleTag<String> mainOutputTag = new TupleTag<String>(){}; +// Output tag for the additional output PCollection of strings. +final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){}; +// Output tag for the additional output PCollection of integers. +final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){}; + +PCollectionTuple results = input.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) + // Specify the tag for the main output. + .withOutputTags(mainOutputTag, + // Specify the tags for the two additional outputs as a TupleTagList. + TupleTagList.of(additionalOutputTagString) + .and(additionalOutputTagIntegers))); Review Comment: ```suggestion // Input PCollection that contains strings. PCollection<String> input = ...; // Output tag for the main output PCollection of strings. final TupleTag<String> mainOutputTag = new TupleTag<String>() {}; // Output tag for the additional output PCollection of strings. final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>() {}; // Output tag for the additional output PCollection of integers. final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>() {}; PCollectionTuple results = input.apply(ParDo .of(new DoFn<String, String>() { // DoFn continues here. ... }) // Specify the tag for the main output. .withOutputTags(mainOutputTag, // Specify the tags for the two additional outputs as a TupleTagList. TupleTagList.of(additionalOutputTagString) .and(additionalOutputTagIntegers))); ``` ########## learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md: ########## @@ -0,0 +1,101 @@ +Prompt: +What are additional outputs in Apache Beam? + +Response: +In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra `PCollection` objects produced by a single `ParDo` transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection` objects and return them bundled together with the main output `PCollection`. + +Additional outputs serve as a mechanism to implement pipeline branching. You can use them when there is a need to split the output of a single transform into several `PCollection` objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once. + +Producing additional outputs requires tagging each output `PCollection` with a unique identifier, which is then used to emit elements to the corresponding output. + +In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag` objects for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. + +The following Java code implements two additional output `PCollection` objects for string and integer values in addition to the main output `PCollection` of strings: + +```java +// Input PCollection that contains strings. + PCollection<String> input = ...; +// Output tag for the main output PCollection of strings. +final TupleTag<String> mainOutputTag = new TupleTag<String>(){}; +// Output tag for the additional output PCollection of strings. +final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){}; +// Output tag for the additional output PCollection of integers. +final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){}; + +PCollectionTuple results = input.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) + // Specify the tag for the main output. + .withOutputTags(mainOutputTag, + // Specify the tags for the two additional outputs as a TupleTagList. + TupleTagList.of(additionalOutputTagString) + .and(additionalOutputTagIntegers))); +``` + +The `processElement` method can emit elements to the main output or any additional output by invoking the output method on the `MultiOutputReceiver` object. The output method takes the tag of the output and the element to be emitted as arguments. + +```java +public void processElement(@Element String word, MultiOutputReceiver out) { + if (condition for main output) { + // Emit element to main output + out.get(mainOutputTag).output(word); + } else { + // Emit element to additional string output + out.get(additionalOutputTagString).output(word); + } + if (condition for additional integer output) { + // Emit element to additional integer output + out.get(additionalOutputTagIntegers).output(word.length()); + } + } +``` + +In the Apache Beam Python SDK, you can implement additional outputs by invoking the `with_outputs()` method on the `ParDo` transform and specifying the expected tags for the multiple outputs. + +The following Python code demonstrates how to implement additional outputs for a `ParDo` transform that outputs two `PCollection` objects of strings and integers in addition to the main output `PCollection` of strings: + +```python +class SplitLinesToWordsFn(beam.DoFn): + + # These tags will be used to tag the outputs of this DoFn. + OUTPUT_TAG_SHORT_WORDS = 'tag_short_words' + OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count' + + def process(self, element): + # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged collection. + yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element)) + + words = re.findall(r'[A-Za-z\']+', element) + for word in words: + if len(word) <= 3: + # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged collection. + yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word) + else: + # yield word to add it to the main collection. + yield word Review Comment: ```suggestion class SplitLinesToWordsFn(beam.DoFn): # These tags will be used to tag the outputs of this DoFn. OUTPUT_TAG_SHORT_WORDS = "tag_short_words" OUTPUT_TAG_CHARACTER_COUNT = "tag_character_count" def process(self, element): # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged collection. yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element)) words = re.findall(r"[A-Za-z\']+", element) for word in words: if len(word) <= 3: # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged collection. yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word) else: # yield word to add it to the main collection. yield word ``` ########## learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md: ########## @@ -0,0 +1,101 @@ +Prompt: +What are additional outputs in Apache Beam? + +Response: +In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra `PCollection` objects produced by a single `ParDo` transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection` objects and return them bundled together with the main output `PCollection`. + +Additional outputs serve as a mechanism to implement pipeline branching. You can use them when there is a need to split the output of a single transform into several `PCollection` objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once. + +Producing additional outputs requires tagging each output `PCollection` with a unique identifier, which is then used to emit elements to the corresponding output. + +In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag` objects for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. + +The following Java code implements two additional output `PCollection` objects for string and integer values in addition to the main output `PCollection` of strings: + +```java +// Input PCollection that contains strings. + PCollection<String> input = ...; +// Output tag for the main output PCollection of strings. +final TupleTag<String> mainOutputTag = new TupleTag<String>(){}; +// Output tag for the additional output PCollection of strings. +final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){}; +// Output tag for the additional output PCollection of integers. +final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){}; + +PCollectionTuple results = input.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) + // Specify the tag for the main output. + .withOutputTags(mainOutputTag, + // Specify the tags for the two additional outputs as a TupleTagList. + TupleTagList.of(additionalOutputTagString) + .and(additionalOutputTagIntegers))); +``` + +The `processElement` method can emit elements to the main output or any additional output by invoking the output method on the `MultiOutputReceiver` object. The output method takes the tag of the output and the element to be emitted as arguments. + +```java +public void processElement(@Element String word, MultiOutputReceiver out) { + if (condition for main output) { + // Emit element to main output + out.get(mainOutputTag).output(word); + } else { + // Emit element to additional string output + out.get(additionalOutputTagString).output(word); + } + if (condition for additional integer output) { + // Emit element to additional integer output + out.get(additionalOutputTagIntegers).output(word.length()); + } + } Review Comment: ```suggestion public void processElement(@Element String word, MultiOutputReceiver out) { if (condition for main output) { // Emit element to main output out.get(mainOutputTag).output(word); } else { // Emit element to additional string output out.get(additionalOutputTagString).output(word); } if (condition for additional integer output) { // Emit element to additional integer output out.get(additionalOutputTagIntegers).output(word.length()); } } ``` ########## learning/prompts/documentation-lookup-nolinks/26_io_avro.md: ########## @@ -0,0 +1,27 @@ +Prompt: +Is Apache Avro format supported in Apache Beam? + +Response: +Apache Avro is a data format for storing and exchanging data. Apache Beam supports reading from and writing to Avro files using the `ReadFromAvro` and `WriteToAvro` transforms in the `AvroIO` module. For more information, see the AvroIO connector documentation for your programming language of choice. + +To get started with Avro and Apache Beam, refer to the Dataflow Cookbook GitHub repository. + +Here is an example of Apache Beam pipeline code for reading data from an Avro file: + +```python +class ReadAvroOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument( + "--path", + default="gs://cloud-samples-data/bigquery/us-states/*.avro", + help="GCS path to read from") + + options = ReadAvroOptions() + + with beam.Pipeline(options=options) as p: + + (p | "Read from Avro" >> ReadFromAvro(options.path) + | Map(logging.info)) Review Comment: ```suggestion class ReadAvroOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( "--path", default="gs://cloud-samples-data/bigquery/us-states/*.avro", help="GCS path to read from", ) options = ReadAvroOptions() with beam.Pipeline(options=options) as p: (p | "Read from Avro" >> ReadFromAvro(options.path) | Map(logging.info)) ``` ########## learning/prompts/documentation-lookup-nolinks/53_dataflow_runner.md: ########## @@ -0,0 +1,80 @@ +Prompt: +How to execute Apache Beam pipelines with Cloud Dataflow? + +Response: +Apache Beam supports multiple runners for executing pipelines on various platforms. One such runner is the Cloud Dataflow Runner, tailored for running pipelines on the Google Cloud Dataflow service. Cloud Dataflow offers fully managed and unified stream and batch data processing, boasting dynamic work rebalancing and built-in autoscaling capabilities. + +When you execute a pipeline on Cloud Dataflow, the Runner uploads your code and dependencies to a Cloud Storage bucket and creates a Dataflow job, which then executes your pipeline on managed resources within the Google Cloud Platform. + +To execute Apache Beam pipelines using the Cloud Dataflow Runner, follow these steps: + +***1. Setup Your Cloud Project and Resources:*** + +Complete the steps outlined in the 'Before You Begin' section of the Cloud Dataflow quickstart for your chosen programming language: +1. Select or create a Google Cloud Platform Console project. +2. Enable billing for your project. +3. Enable the required Google Cloud APIs, including Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, and Cloud Resource Manager. Additional APIs may be necessary depending on your pipeline code. +4. Authenticate with Google Cloud Platform. +5. Install the Google Cloud SDK. +6. Create a Cloud Storage bucket. + +***2. Specify Dependencies (Java Only):*** + +When using the Apache Beam Java SDK, specify your dependency on the Cloud Dataflow Runner in the `pom.xml` file of your Java project directory. + +```java +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <version>2.54.0</version> + <scope>runtime</scope> +</dependency> +``` + +Ensure that you include all necessary dependencies to create a self-contained application. In some cases, such as when starting a pipeline using a scheduler, you'll need to package a self-executing JAR by explicitly adding a dependency in the Project section of your `pom.xml` file. For more details about running self-executing JARs on Cloud Dataflow, refer to the 'Self-executing JAR' section in the Apache Beam documentation on Cloud Dataflow Runner. + +***3. Configure Pipeline Options:*** + +Configure the execution details, including the runner (set to `dataflow` or `DataflowRunner`), Cloud project ID, region, and streaming mode, using the `GoogleCloudOptions` interface for Python or the `DataflowPipelineOptions` interface for Java. + +You can utilize pipeline options to control various aspects of how Cloud Dataflow executes your job. For instance, you can specify whether your pipeline runs on worker virtual machines, on the Cloud Dataflow service backend, or locally. For additional pipeline configuration options, refer to the reference documentation for the respective interface. + +***4. Run Your Pipeline on Cloud Dataflow:*** + +Execute your pipeline on Cloud Dataflow using the appropriate command for your SDK. + +The following example code, taken from the Cloud Dataflow quickstarts for Java and Python, shows how to run the WordCount example pipeline on Dataflow. + +For the Apache Beam Java SDK, in your terminal, run the following command (from your `word-count-beam` directory): + +```java + mvn -Pdataflow-runner compile exec:java \ + -Dexec.mainClass=org.apache.beam.examples.WordCount \ + -Dexec.args="--project=PROJECT_ID \ + --gcpTempLocation=gs://BUCKET_NAME/temp/ \ + --output=gs://BUCKET_NAME/output \ + --runner=DataflowRunner \ + --region=REGION + ``` Review Comment: nit: remove extra spaces ########## learning/prompts/documentation-lookup-nolinks/30_advanced_additional_outputs.md: ########## @@ -0,0 +1,101 @@ +Prompt: +What are additional outputs in Apache Beam? + +Response: +In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra `PCollection` objects produced by a single `ParDo` transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection` objects and return them bundled together with the main output `PCollection`. + +Additional outputs serve as a mechanism to implement pipeline branching. You can use them when there is a need to split the output of a single transform into several `PCollection` objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once. + +Producing additional outputs requires tagging each output `PCollection` with a unique identifier, which is then used to emit elements to the corresponding output. + +In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag` objects for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. + +The following Java code implements two additional output `PCollection` objects for string and integer values in addition to the main output `PCollection` of strings: + +```java +// Input PCollection that contains strings. + PCollection<String> input = ...; +// Output tag for the main output PCollection of strings. +final TupleTag<String> mainOutputTag = new TupleTag<String>(){}; +// Output tag for the additional output PCollection of strings. +final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){}; +// Output tag for the additional output PCollection of integers. +final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){}; + +PCollectionTuple results = input.apply(ParDo + .of(new DoFn<String, String>() { + // DoFn continues here. + ... + }) + // Specify the tag for the main output. + .withOutputTags(mainOutputTag, + // Specify the tags for the two additional outputs as a TupleTagList. + TupleTagList.of(additionalOutputTagString) + .and(additionalOutputTagIntegers))); +``` + +The `processElement` method can emit elements to the main output or any additional output by invoking the output method on the `MultiOutputReceiver` object. The output method takes the tag of the output and the element to be emitted as arguments. + +```java +public void processElement(@Element String word, MultiOutputReceiver out) { + if (condition for main output) { + // Emit element to main output + out.get(mainOutputTag).output(word); + } else { + // Emit element to additional string output + out.get(additionalOutputTagString).output(word); + } + if (condition for additional integer output) { + // Emit element to additional integer output + out.get(additionalOutputTagIntegers).output(word.length()); + } + } +``` + +In the Apache Beam Python SDK, you can implement additional outputs by invoking the `with_outputs()` method on the `ParDo` transform and specifying the expected tags for the multiple outputs. + +The following Python code demonstrates how to implement additional outputs for a `ParDo` transform that outputs two `PCollection` objects of strings and integers in addition to the main output `PCollection` of strings: + +```python +class SplitLinesToWordsFn(beam.DoFn): + + # These tags will be used to tag the outputs of this DoFn. + OUTPUT_TAG_SHORT_WORDS = 'tag_short_words' + OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count' + + def process(self, element): + # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged collection. + yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element)) + + words = re.findall(r'[A-Za-z\']+', element) + for word in words: + if len(word) <= 3: + # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged collection. + yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word) + else: + # yield word to add it to the main collection. + yield word +``` + +The method returns a `DoOutputsTuple` object, with the specified tags serving as attributes that provide `ParDo` with access to the corresponding output `PCollection` objects. + +```python +with beam.Pipeline(options=pipeline_options) as p: + + lines = p | ReadFromText(known_args.input) + + # with_outputs allows accessing the explicitly tagged outputs of a DoFn. + split_lines_result = ( + lines + | beam.ParDo(SplitLinesToWordsFn()).with_outputs( + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, + SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, + main='words')) + + # split_lines_result is an object of type DoOutputsTuple + words, _, _ = split_lines_result + short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] + character_count = split_lines_result.tag_character_count Review Comment: ```suggestion with beam.Pipeline(options=pipeline_options) as p: lines = p | ReadFromText(known_args.input) # with_outputs allows accessing the explicitly tagged outputs of a DoFn. split_lines_result = lines | beam.ParDo(SplitLinesToWordsFn()).with_outputs( SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, main="words", ) # split_lines_result is an object of type DoOutputsTuple words, _, _ = split_lines_result short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] character_count = split_lines_result.tag_character_count ``` ########## learning/prompts/documentation-lookup-nolinks/54_spark_runner.md: ########## @@ -0,0 +1,148 @@ +Prompt: +How to execute Apache Beam pipelines with Apache Spark? + +Response: +Apache Beam supports multiple runners for executing pipelines on various platforms. One such runner is the Spark Runner, tailored for running pipelines using Apache Spark. The Spark Runner enables you to leverage the scalability and parallel processing capabilities of Apache Spark for your data processing tasks. + +Key features of the Spark Runner include: +* Support for batch, streaming, and unified pipelines. +* Offering the same fault-tolerance guarantees and security features as provided by Apache Spark. +* Built-in metrics reporting using Apache Spark’s metrics system, which also reports Beam Aggregators. +* Native support for Apache Beam side inputs via Apache Spark’s broadcast variables. + +There are three types of Spark Runners available: +1. Legacy Spark Runner: supports Java (and other JVM-based languages) exclusively, based on Apache Spark’s RDD and DStream. +2. Structured Streaming Spark Runner: supports Java (and other JVM-based languages) exclusively, based on Apache Spark's Datasets and Structured Streaming framework. Currently, it only supports batch mode with limited coverage of the Apache Beam model. +3. Portable Spark Runner: supports Java, Python, and Go. + +For Java-based applications, consider using the Java-based runners, while for Python or Go pipelines, opt for the portable Runner. + +The Spark Runner can execute Spark pipelines similar to a native Spark application, allowing deployment as a self-contained application for local mode, running on Spark Standalone Resource Manager (RM), or using YARN or Mesos. + +To execute your Apache Beam pipeline on a Spark Standalone RM, follow these steps: + +***Java-based Non-portable Spark Runners (Java Only)*** + +***1. Specify Dependencies:*** + +In the `pom.xml` file of your Java project directory, specify your dependency on the latest version of the Spark Runner: + +```java +<dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-spark-3</artifactId> + <version>2.54.0</version> +</dependency> +``` + +***2. Deploy Spark with Your Application:*** + +When running pipelines in a Spark Standalone mode, ensure that your self-contained application includes Spark dependencies explicitly in your `pom.xml` file: + +```java +<dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.12</artifactId> + <version>${spark.version}</version> +</dependency> + +<dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.12</artifactId> + <version>${spark.version}</version> +</dependency> +``` + +Shade the application JAR using the Maven shade plugin and make sure the shaded JAR file is visible in the target directory by running `is target`. + +To run pipelines in a Spark Standalone mode using the legacy RDD/DStream-based Spark Runner, use the following command: + +```java +spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner +``` + +To run pipelines in a Spark Standalone mode using the Structured Streaming Spark Runner, run the following command: + +```java +spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner +``` + +***3. Configure Pipeline Options:*** + +Set the runner option in your pipeline options to specify that you want to use the Spark Runner. In Java, you can do this as follows: + +```java +SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); +options.setRunner(SparkRunner.class); +``` + +For additional pipeline configuration options, refer to the Spark Runner documentation. + +***4. Run Your Pipeline:*** + +In Java, you can use the `PipelineRunner` to run your pipeline: + +```java +Pipeline p = Pipeline.create(options); +// Add transforms to your pipeline +p.run(); +``` + +***5. Monitor Your Job:*** + +Monitor the execution of your pipeline using the Apache Spark Web Interfaces, which provides information about tasks, stages, and overall progress. Access the Spark UI by navigating to the appropriate URL (usually `localhost:4040`). Metrics are also accessible via the Apache Beam REST API. Apache Spark offers a metrics system for reporting metrics to various sinks. + +***Portable Spark Runner (Python)*** + +***1. Deploy Spark with Your Application:*** + +You will need Docker installed in your execution environment. Pre-built Spark Job Service Docker images are available on Docker Hub. + +Start the JobService endpoint: + +```python +docker run --net=host apache/beam_spark_job_server:latest +``` +A Beam JobService is a central instance where you submit your Apache Beam pipeline. It needs to be provided with the Spark master address to create a job for execution on your Spark cluster. + +Submit the Python pipeline to this endpoint, providing Beam JobService with the Spark master address to execute the job on a Spark cluster: + +```python +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +options = PipelineOptions([ + "--runner=PortableRunner", + "--job_endpoint=localhost:8099", # localhost:8099 is the default address of the JobService + "--environment_type=LOOPBACK" +]) +with beam.Pipeline(options) as p: + ... +``` + +***2. Configure Pipeline Options:*** + +Set the runner option in your pipeline options to specify that you want to use the Spark Runner. In Python, you can do this as follows: + +```python +from apache_beam.options.pipeline_options import PipelineOptions +options = PipelineOptions() +options.view_as(SparkRunnerOptions).runner = 'SparkRunner' +``` + +For additional pipeline configuration options, refer to the Spark Runner documentation. + +***3. Run Your Pipeline:*** + +In Python, you can use the `run()` method of your pipeline object to execute the pipeline: + +```python + # Run your pipeline + p.run() Review Comment: ```suggestion # Run your pipeline p.run() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org