This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2dcb15f3d1f Add Dataflow Apache Beam Java streaming system test
(#47209)
2dcb15f3d1f is described below
commit 2dcb15f3d1ffa8fa1474d6833a39a81dc3500a96
Author: olegkachur-e <[email protected]>
AuthorDate: Sun Mar 9 21:56:27 2025 +0100
Add Dataflow Apache Beam Java streaming system test (#47209)
- add system test
- update docs
- add sources to the build jar file
Co-authored-by: Oleg Kachur <[email protected]>
---
providers/google/docs/operators/cloud/dataflow.rst | 8 ++
.../dataflow/example_dataflow_java_streaming.py | 145 +++++++++++++++++++++
.../non_python_src/java_streaming_src/README.MD | 58 +++++++++
.../non_python_src/java_streaming_src/pom.xml | 131 +++++++++++++++++++
.../java/org/example/pubsub/StreamingExample.java | 42 ++++++
.../check_providers_subpackages_all_have_init.py | 1 +
6 files changed, 385 insertions(+)
diff --git a/providers/google/docs/operators/cloud/dataflow.rst
b/providers/google/docs/operators/cloud/dataflow.rst
index f4b4408f591..f72f7248796 100644
--- a/providers/google/docs/operators/cloud/dataflow.rst
+++ b/providers/google/docs/operators/cloud/dataflow.rst
@@ -138,6 +138,14 @@ Here is an example of creating and running a pipeline in
Java with jar stored on
:start-after: [START howto_operator_start_java_job_local_jar]
:end-before: [END howto_operator_start_java_job_local_jar]
+Here is an example of creating and running a streaming pipeline in Java with
jar stored on GCS:
+
+.. exampleinclude::
/../../providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_start_java_streaming]
+ :end-before: [END howto_operator_start_java_streaming]
+
.. _howto/operator:PythonSDKPipelines:
Python SDK pipelines
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
new file mode 100644
index 00000000000..cf6c3228fc7
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with
Java(streaming).
+
+Important Note:
+ This test downloads Java JAR file from the public bucket. In case the JAR
file cannot be downloaded
+ or is not compatible with the Java version used in the test.
+ There is no streaming pipeline example for Apache Beam Java SDK, the
source code and build instructions
+ are located in
`providers/google/tests/system/google/cloud/dataflow/resources/java_streaming_src/`.
+
+ You can follow the instructions on how to pack a self-executing jar here:
+ https://beam.apache.org/documentation/runners/dataflow/
+
+Requirements:
+ These operators require the gcloud command and Java's JRE to run.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
+from airflow.providers.apache.beam.operators.beam import
BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import
DataflowStopJobOperator
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.pubsub import (
+ PubSubCreateTopicOperator,
+ PubSubDeleteTopicOperator,
+)
+from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "dataflow_java_streaming"
+LOCATION = "europe-west3"
+BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}"
+GCS_TMP = f"gs://{BUCKET_NAME}/temp"
+GCS_OUTPUT = f"gs://{BUCKET_NAME}/DF_OUT"
+RESOURCE_BUCKET = "airflow-system-tests-resources"
+JAR_FILE_NAME = "stream-pubsub-example-bundled-v-0.1.jar"
+GCS_JAR_PATH = f"gs://{RESOURCE_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
+# For the distributed system, we need to store the JAR file in a folder that
can be accessed by multiple
+# worker.
+# For example in Composer the correct path is
gcs/data/word-count-beam-bundled-0.1.jar.
+# Because gcs/data/ is shared folder for Airflow's workers.
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
+REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
+
+OUTPUT_TOPIC_ID = f"tp-{ENV_ID}-out"
+INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime"
+OUTPUT_TOPIC = f"projects/{PROJECT_ID}/topics/{OUTPUT_TOPIC_ID}"
+
+
+with DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2025, 2, 1),
+ catchup=False,
+ tags=["example", "dataflow", "java", "streaming"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
+ download_file = GCSToLocalFilesystemOperator(
+ task_id="download_file",
+ object_name=f"dataflow/java/{JAR_FILE_NAME}",
+ bucket=RESOURCE_BUCKET,
+ filename=LOCAL_JAR,
+ )
+ create_output_pub_sub_topic = PubSubCreateTopicOperator(
+ task_id="create_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID,
fail_if_exists=False
+ )
+ # [START howto_operator_start_java_streaming]
+
+ start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator(
+ runner=BeamRunnerType.DataflowRunner,
+ task_id="start_java_streaming_dataflow_job",
+ jar=LOCAL_JAR,
+ pipeline_options={
+ "tempLocation": GCS_TMP,
+ "input_topic": INPUT_TOPIC,
+ "output_topic": OUTPUT_TOPIC,
+ "streaming": True,
+ },
+ dataflow_config={
+ "job_name": f"java-streaming-job-{ENV_ID}",
+ "location": LOCATION,
+ },
+ )
+ # [END howto_operator_start_java_streaming]
+ stop_dataflow_job = DataflowStopJobOperator(
+ task_id="stop_dataflow_job",
+ location=LOCATION,
+ job_id="{{
task_instance.xcom_pull(task_ids='start_java_streaming_dataflow_job')['dataflow_job_id']
}}",
+ )
+ delete_topic = PubSubDeleteTopicOperator(
+ task_id="delete_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID
+ )
+ delete_topic.trigger_rule = TriggerRule.ALL_DONE
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> download_file
+ >> create_output_pub_sub_topic
+ # TEST BODY
+ >> start_java_streaming_job_dataflow
+ # TEST TEARDOWN
+ >> stop_dataflow_job
+ >> delete_topic
+ >> delete_bucket
+ )
+
+ from tests_common.test_utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
new file mode 100644
index 00000000000..f8be0c9191e
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
@@ -0,0 +1,58 @@
+# Dataflow Java Streaming
+
+This project is a streaming example running with latest Apache Beam Java SDK
(v.2.63),
+as there is no "official" streaming example yet
https://beam.apache.org/get-started/wordcount-example/.
+
+Sample logic to direct streaming data from input Pub/Sub source to the output
PubSub topic to work
+with unbounded data source/destination.
+
+That used for Java dataflow streaming system tests.
+
+
+## Requirements
+
+- **Java 17**: Ensure you have Java 17 installed and configured on your system.
+- **Maven**: Make sure Maven is installed and configured on your system.
+- Maven is used for dependency management and building the project.
+
+## Project Structure
+
+The project's structure is as follows:
+
+```plaintext
+├── src
+│ ├── main
+│ │ ├── java
+│ │ │ └── org
+│ │ │ └── example
+│ │ │ └── pubsub
+│ │ │ └── StreamingExample.java
+├── pom.xml
+└── README.md
+```
+
+## Build
+It was checked to build inside Breeze container with dependencies installed
from the [requirements](#requirements).
+
+The output artifact is `target/stream-pubsub-example-bundled-v-0.1.jar`
executable.
+
+
+## Run
+To run use the
+```bash
+java -jar target/stream_pubsub-bundled-sample-0.1.jar \
+--runner=DataflowRunner \
+--project=<project_id> \
+--region=<location_id> \
+--jobName=<df_job_name> \
+--input_toopic=<input_PubSub_topic> \
+--output_topic=<input_PubSub_topic>
+```
+
+optionally you might add the
+`'--labels=<custom_labels_dict>'` or `--tempLocation=<gcs_path>` and
`--stagingLocation=<gcs_path>`
+or other dataflow pipeline options, if needed.
+
+
+## Runners
+The `DataflowRunner` and `DirectRunner` are supported.
diff --git
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
new file mode 100644
index 00000000000..85548bf7ab1
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
@@ -0,0 +1,131 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://www.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.example</groupId>
+ <artifactId>stream-pubsub-example</artifactId>
+ <version>v-0.1</version>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <beam.version>2.63.0</beam.version>
+ <slf4j.version>2.0.16</slf4j.version>
+ <google.cloud.version>1.113.0</google.cloud.version>
+ <secretmanager.version>2.45.0</secretmanager.version>
+ <iamcredentials.version>2.45.0</iamcredentials.version>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+
+ <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
+ <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
+ </properties>
+ <dependencies>
+ <!-- Beam Dependencies-->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+<!-- Direct runner might be used for tests-->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-pubsub</artifactId>
+ <version>${google.cloud.version}</version>
+ </dependency>
+ <!-- Log -->
+ <!-- Add slf4j API frontend binding with JUL backend on runtime-->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven-shade-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/LICENSE</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.example.pubsub.StreamingExample</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
new file mode 100644
index 00000000000..2616fccad92
--- /dev/null
+++
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
@@ -0,0 +1,42 @@
+package org.example.pubsub;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import java.util.logging.Logger;
+
+
+public class StreamingExample {
+
+ public interface StreamingExampleOptions extends PipelineOptions,
StreamingOptions {
+ @Description("Input Pub/Sub Topic")
+ @Validation.Required
+ String getInput_topic();
+ void setInput_topic(String inputTopic);
+
+ @Description("Output Pub/Sub Topic")
+ @Validation.Required
+ String getOutput_topic();
+ void setOutput_topic(String outputTopic);
+ }
+
+ public static void main(String[] args) {
+ StreamingExampleOptions options = PipelineOptionsFactory
+ .fromArgs(args)
+ .withValidation()
+ .as(StreamingExampleOptions.class);
+ options.setStreaming(true);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline
+ .apply("ReadFromPubSub",
PubsubIO.readStrings().fromTopic(options.getInput_topic()))
+ .apply("WriteToPubSub",
PubsubIO.writeStrings().to(options.getOutput_topic()));
+
+ pipeline.run();
+ }
+}
diff --git a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
index 7139170aff7..7c486526dca 100755
--- a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
+++ b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
@@ -33,6 +33,7 @@ ACCEPTED_NON_INIT_DIRS = [
"static",
"dist",
"node_modules",
+ "non_python_src",
]
PATH_EXTENSION_STRING = '__path__ =
__import__("pkgutil").extend_path(__path__, __name__) # type: ignore'