This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dcb15f3d1f Add Dataflow Apache Beam Java streaming system test 
(#47209)
2dcb15f3d1f is described below

commit 2dcb15f3d1ffa8fa1474d6833a39a81dc3500a96
Author: olegkachur-e <[email protected]>
AuthorDate: Sun Mar 9 21:56:27 2025 +0100

    Add Dataflow Apache Beam Java streaming system test (#47209)
    
    - add system test
    - update docs
    - add sources to the build jar file
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 providers/google/docs/operators/cloud/dataflow.rst |   8 ++
 .../dataflow/example_dataflow_java_streaming.py    | 145 +++++++++++++++++++++
 .../non_python_src/java_streaming_src/README.MD    |  58 +++++++++
 .../non_python_src/java_streaming_src/pom.xml      | 131 +++++++++++++++++++
 .../java/org/example/pubsub/StreamingExample.java  |  42 ++++++
 .../check_providers_subpackages_all_have_init.py   |   1 +
 6 files changed, 385 insertions(+)

diff --git a/providers/google/docs/operators/cloud/dataflow.rst 
b/providers/google/docs/operators/cloud/dataflow.rst
index f4b4408f591..f72f7248796 100644
--- a/providers/google/docs/operators/cloud/dataflow.rst
+++ b/providers/google/docs/operators/cloud/dataflow.rst
@@ -138,6 +138,14 @@ Here is an example of creating and running a pipeline in 
Java with jar stored on
     :start-after: [START howto_operator_start_java_job_local_jar]
     :end-before: [END howto_operator_start_java_job_local_jar]
 
+Here is an example of creating and running a streaming pipeline in Java with 
jar stored on GCS:
+
+.. exampleinclude:: 
/../../providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_start_java_streaming]
+    :end-before: [END howto_operator_start_java_streaming]
+
 .. _howto/operator:PythonSDKPipelines:
 
 Python SDK pipelines
diff --git 
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
 
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
new file mode 100644
index 00000000000..cf6c3228fc7
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with 
Java(streaming).
+
+Important Note:
+    This test downloads Java JAR file from the public bucket. In case the JAR 
file cannot be downloaded
+    or is not compatible with the Java version used in the test.
+    There is no streaming pipeline example for Apache Beam Java SDK, the 
source code and build instructions
+    are located in 
`providers/google/tests/system/google/cloud/dataflow/resources/java_streaming_src/`.
+
+    You can follow the instructions on how to pack a self-executing jar here:
+    https://beam.apache.org/documentation/runners/dataflow/
+
+Requirements:
+    These operators require the gcloud command and Java's JRE to run.
+"""
+
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
+from airflow.providers.apache.beam.operators.beam import 
BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStopJobOperator
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.pubsub import (
+    PubSubCreateTopicOperator,
+    PubSubDeleteTopicOperator,
+)
+from airflow.providers.google.cloud.transfers.gcs_to_local import 
GCSToLocalFilesystemOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "dataflow_java_streaming"
+LOCATION = "europe-west3"
+BUCKET_NAME = f"bucket-{DAG_ID}-{ENV_ID}"
+GCS_TMP = f"gs://{BUCKET_NAME}/temp"
+GCS_OUTPUT = f"gs://{BUCKET_NAME}/DF_OUT"
+RESOURCE_BUCKET = "airflow-system-tests-resources"
+JAR_FILE_NAME = "stream-pubsub-example-bundled-v-0.1.jar"
+GCS_JAR_PATH = f"gs://{RESOURCE_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
+# For the distributed system, we need to store the JAR file in a folder that 
can be accessed by multiple
+# worker.
+# For example in Composer the correct path is 
gcs/data/word-count-beam-bundled-0.1.jar.
+# Because gcs/data/ is shared folder for Airflow's workers.
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
+REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
+
+OUTPUT_TOPIC_ID = f"tp-{ENV_ID}-out"
+INPUT_TOPIC = "projects/pubsub-public-data/topics/taxirides-realtime"
+OUTPUT_TOPIC = f"projects/{PROJECT_ID}/topics/{OUTPUT_TOPIC_ID}"
+
+
+with DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2025, 2, 1),
+    catchup=False,
+    tags=["example", "dataflow", "java", "streaming"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", 
bucket_name=BUCKET_NAME)
+    download_file = GCSToLocalFilesystemOperator(
+        task_id="download_file",
+        object_name=f"dataflow/java/{JAR_FILE_NAME}",
+        bucket=RESOURCE_BUCKET,
+        filename=LOCAL_JAR,
+    )
+    create_output_pub_sub_topic = PubSubCreateTopicOperator(
+        task_id="create_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID, 
fail_if_exists=False
+    )
+    # [START howto_operator_start_java_streaming]
+
+    start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator(
+        runner=BeamRunnerType.DataflowRunner,
+        task_id="start_java_streaming_dataflow_job",
+        jar=LOCAL_JAR,
+        pipeline_options={
+            "tempLocation": GCS_TMP,
+            "input_topic": INPUT_TOPIC,
+            "output_topic": OUTPUT_TOPIC,
+            "streaming": True,
+        },
+        dataflow_config={
+            "job_name": f"java-streaming-job-{ENV_ID}",
+            "location": LOCATION,
+        },
+    )
+    # [END howto_operator_start_java_streaming]
+    stop_dataflow_job = DataflowStopJobOperator(
+        task_id="stop_dataflow_job",
+        location=LOCATION,
+        job_id="{{ 
task_instance.xcom_pull(task_ids='start_java_streaming_dataflow_job')['dataflow_job_id']
 }}",
+    )
+    delete_topic = PubSubDeleteTopicOperator(
+        task_id="delete_topic", topic=OUTPUT_TOPIC_ID, project_id=PROJECT_ID
+    )
+    delete_topic.trigger_rule = TriggerRule.ALL_DONE
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> download_file
+        >> create_output_pub_sub_topic
+        # TEST BODY
+        >> start_java_streaming_job_dataflow
+        # TEST TEARDOWN
+        >> stop_dataflow_job
+        >> delete_topic
+        >> delete_bucket
+    )
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
new file mode 100644
index 00000000000..f8be0c9191e
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/README.MD
@@ -0,0 +1,58 @@
+# Dataflow Java Streaming
+
+This project is a streaming example running with latest Apache Beam Java SDK 
(v.2.63),
+as there is no "official" streaming example yet 
https://beam.apache.org/get-started/wordcount-example/.
+
+Sample logic to direct streaming data from input Pub/Sub source to the output 
PubSub topic to work
+with unbounded data source/destination.
+
+That used for Java dataflow streaming system tests.
+
+
+## Requirements
+
+- **Java 17**: Ensure you have Java 17 installed and configured on your system.
+- **Maven**: Make sure Maven is installed and configured on your system.
+- Maven is used for dependency management and building the project.
+
+## Project Structure
+
+The project's structure is as follows:
+
+```plaintext
+├── src
+│   ├── main
+│   │   ├── java
+│   │   │   └── org
+│   │   │       └── example
+│   │   │           └── pubsub
+│   │   │               └── StreamingExample.java
+├── pom.xml
+└── README.md
+```
+
+## Build
+It was checked to build inside Breeze container with dependencies installed 
from the [requirements](#requirements).
+
+The output artifact is `target/stream-pubsub-example-bundled-v-0.1.jar` 
executable.
+
+
+## Run
+To run use the
+```bash
+java -jar target/stream_pubsub-bundled-sample-0.1.jar \
+--runner=DataflowRunner \
+--project=<project_id> \
+--region=<location_id> \
+--jobName=<df_job_name> \
+--input_toopic=<input_PubSub_topic> \
+--output_topic=<input_PubSub_topic>
+```
+
+optionally you might add the
+`'--labels=<custom_labels_dict>'` or  `--tempLocation=<gcs_path>` and  
`--stagingLocation=<gcs_path>`
+or other dataflow pipeline options, if needed.
+
+
+## Runners
+The `DataflowRunner` and `DirectRunner` are supported.
diff --git 
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
new file mode 100644
index 00000000000..85548bf7ab1
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/pom.xml
@@ -0,0 +1,131 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://www.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.example</groupId>
+    <artifactId>stream-pubsub-example</artifactId>
+    <version>v-0.1</version>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <beam.version>2.63.0</beam.version>
+        <slf4j.version>2.0.16</slf4j.version>
+        <google.cloud.version>1.113.0</google.cloud.version>
+        <secretmanager.version>2.45.0</secretmanager.version>
+        <iamcredentials.version>2.45.0</iamcredentials.version>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+
+        <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
+        <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
+    </properties>
+    <dependencies>
+        <!--        Beam Dependencies-->
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+<!--        Direct runner might be used for tests-->
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-pubsub</artifactId>
+            <version>${google.cloud.version}</version>
+        </dependency>
+        <!-- Log -->
+        <!-- Add slf4j API frontend binding with JUL backend on runtime-->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-jdk14</artifactId>
+            <version>${slf4j.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/LICENSE</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.example.pubsub.StreamingExample</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
new file mode 100644
index 00000000000..2616fccad92
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/dataflow/resources/non_python_src/java_streaming_src/src/main/java/org/example/pubsub/StreamingExample.java
@@ -0,0 +1,42 @@
+package org.example.pubsub;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import java.util.logging.Logger;
+
+
+public class StreamingExample {
+
+    public interface StreamingExampleOptions extends PipelineOptions, 
StreamingOptions {
+        @Description("Input Pub/Sub Topic")
+        @Validation.Required
+        String getInput_topic();
+        void setInput_topic(String inputTopic);
+
+        @Description("Output Pub/Sub Topic")
+        @Validation.Required
+        String getOutput_topic();
+        void setOutput_topic(String outputTopic);
+    }
+
+    public static void main(String[] args) {
+        StreamingExampleOptions options = PipelineOptionsFactory
+            .fromArgs(args)
+            .withValidation()
+            .as(StreamingExampleOptions.class);
+        options.setStreaming(true);
+
+        Pipeline pipeline = Pipeline.create(options);
+        pipeline
+            .apply("ReadFromPubSub", 
PubsubIO.readStrings().fromTopic(options.getInput_topic()))
+            .apply("WriteToPubSub", 
PubsubIO.writeStrings().to(options.getOutput_topic()));
+
+        pipeline.run();
+    }
+}
diff --git a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py 
b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
index 7139170aff7..7c486526dca 100755
--- a/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
+++ b/scripts/ci/pre_commit/check_providers_subpackages_all_have_init.py
@@ -33,6 +33,7 @@ ACCEPTED_NON_INIT_DIRS = [
     "static",
     "dist",
     "node_modules",
+    "non_python_src",
 ]
 
 PATH_EXTENSION_STRING = '__path__ = 
__import__("pkgutil").extend_path(__path__, __name__)  # type: ignore'

Reply via email to