This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d793fda391 Add ability to run streaming Job for
BeamRunPythonPipelineOperator in non deferrable mode (#36108)
d793fda391 is described below
commit d793fda39161be9281d3d4da54e2e2b4f6344b4d
Author: Maksim <[email protected]>
AuthorDate: Mon Dec 18 01:15:23 2023 +0100
Add ability to run streaming Job for BeamRunPythonPipelineOperator in non
deferrable mode (#36108)
---
airflow/providers/apache/beam/hooks/beam.py | 14 ++-
airflow/providers/apache/beam/operators/beam.py | 48 +++++++-
airflow/providers/google/cloud/hooks/dataflow.py | 18 +++
.../providers/google/cloud/operators/dataflow.py | 6 +
.../operators/cloud/dataflow.rst | 18 ++-
tests/providers/apache/beam/hooks/test_beam.py | 21 +++-
tests/providers/apache/beam/operators/test_beam.py | 1 +
.../dataflow/example_dataflow_streaming_python.py | 126 +++++++++++++++++++++
8 files changed, 237 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/apache/beam/hooks/beam.py
b/airflow/providers/apache/beam/hooks/beam.py
index c27cc41710..efea53560b 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -90,6 +90,7 @@ def process_fd(
fd,
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
+ check_job_status_callback: Callable[[], bool | None] | None = None,
):
"""
Print output to logs.
@@ -111,6 +112,8 @@ def process_fd(
if process_line_callback:
process_line_callback(line)
func_log(line.rstrip("\n"))
+ if check_job_status_callback and check_job_status_callback():
+ return
def run_beam_command(
@@ -118,6 +121,7 @@ def run_beam_command(
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
+ check_job_status_callback: Callable[[], bool | None] | None = None,
) -> None:
"""
Run pipeline command in subprocess.
@@ -149,14 +153,16 @@ def run_beam_command(
continue
for readable_fd in readable_fds:
- process_fd(proc, readable_fd, log, process_line_callback)
+ process_fd(proc, readable_fd, log, process_line_callback,
check_job_status_callback)
+ if check_job_status_callback and check_job_status_callback():
+ return
if proc.poll() is not None:
break
# Corner case: check if more output was created between the last read and
the process termination
for readable_fd in reads:
- process_fd(proc, readable_fd, log, process_line_callback)
+ process_fd(proc, readable_fd, log, process_line_callback,
check_job_status_callback)
log.info("Process exited with return code: %s", proc.returncode)
@@ -187,6 +193,7 @@ class BeamHook(BaseHook):
command_prefix: list[str],
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
+ check_job_status_callback: Callable[[], bool | None] | None = None,
) -> None:
cmd = [*command_prefix, f"--runner={self.runner}"]
if variables:
@@ -196,6 +203,7 @@ class BeamHook(BaseHook):
process_line_callback=process_line_callback,
working_directory=working_directory,
log=self.log,
+ check_job_status_callback=check_job_status_callback,
)
def start_python_pipeline(
@@ -207,6 +215,7 @@ class BeamHook(BaseHook):
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
+ check_job_status_callback: Callable[[], bool | None] | None = None,
):
"""
Start Apache Beam python pipeline.
@@ -279,6 +288,7 @@ class BeamHook(BaseHook):
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
)
def start_java_pipeline(
diff --git a/airflow/providers/apache/beam/operators/beam.py
b/airflow/providers/apache/beam/operators/beam.py
index 943fc5b697..876f47fa2d 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -67,7 +67,7 @@ class BeamDataflowMixin(metaclass=ABCMeta):
self,
pipeline_options: dict,
job_name_variable_key: str | None = None,
- ) -> tuple[str, dict, Callable[[str], None]]:
+ ) -> tuple[str, dict, Callable[[str], None], Callable[[], bool | None]]:
self.dataflow_hook = self.__set_dataflow_hook()
self.dataflow_config.project_id = self.dataflow_config.project_id or
self.dataflow_hook.project_id
dataflow_job_name = self.__get_dataflow_job_name()
@@ -75,7 +75,8 @@ class BeamDataflowMixin(metaclass=ABCMeta):
pipeline_options, dataflow_job_name, job_name_variable_key
)
process_line_callback = self.__get_dataflow_process_callback()
- return dataflow_job_name, pipeline_options, process_line_callback
+ check_job_status_callback = self.__check_dataflow_job_status_callback()
+ return dataflow_job_name, pipeline_options, process_line_callback,
check_job_status_callback
def __set_dataflow_hook(self) -> DataflowHook:
self.dataflow_hook = DataflowHook(
@@ -123,6 +124,19 @@ class BeamDataflowMixin(metaclass=ABCMeta):
on_new_job_id_callback=set_current_dataflow_job_id
)
+ def __check_dataflow_job_status_callback(self) -> Callable[[], bool |
None]:
+ def check_dataflow_job_status() -> bool | None:
+ if self.dataflow_job_id and self.dataflow_hook:
+ return self.dataflow_hook.is_job_done(
+ location=self.dataflow_config.location,
+ project_id=self.dataflow_config.project_id,
+ job_id=self.dataflow_job_id,
+ )
+ else:
+ return None
+
+ return check_dataflow_job_status
+
class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC):
"""
@@ -184,14 +198,20 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
self,
format_pipeline_options: bool = False,
job_name_variable_key: str | None = None,
- ) -> tuple[bool, str | None, dict, Callable[[str], None] | None]:
+ ) -> tuple[bool, str | None, dict, Callable[[str], None] | None,
Callable[[], bool | None] | None]:
self.beam_hook = BeamHook(runner=self.runner)
pipeline_options = self.default_pipeline_options.copy()
process_line_callback: Callable[[str], None] | None = None
+ check_job_status_callback: Callable[[], bool | None] | None = None
is_dataflow = self.runner.lower() ==
BeamRunnerType.DataflowRunner.lower()
dataflow_job_name: str | None = None
if is_dataflow:
- dataflow_job_name, pipeline_options, process_line_callback =
self._set_dataflow(
+ (
+ dataflow_job_name,
+ pipeline_options,
+ process_line_callback,
+ check_job_status_callback,
+ ) = self._set_dataflow(
pipeline_options=pipeline_options,
job_name_variable_key=job_name_variable_key,
)
@@ -203,9 +223,21 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
snake_case_pipeline_options = {
convert_camel_to_snake(key): pipeline_options[key] for key in
pipeline_options
}
- return is_dataflow, dataflow_job_name,
snake_case_pipeline_options, process_line_callback
+ return (
+ is_dataflow,
+ dataflow_job_name,
+ snake_case_pipeline_options,
+ process_line_callback,
+ check_job_status_callback,
+ )
- return is_dataflow, dataflow_job_name, pipeline_options,
process_line_callback
+ return (
+ is_dataflow,
+ dataflow_job_name,
+ pipeline_options,
+ process_line_callback,
+ check_job_status_callback,
+ )
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
@@ -297,6 +329,7 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
self.dataflow_job_name,
self.snake_case_pipeline_options,
self.process_line_callback,
+ self.check_job_status_callback,
) = self._init_pipeline_options(format_pipeline_options=True,
job_name_variable_key="job_name")
if not self.beam_hook:
raise AirflowException("Beam hook is not defined.")
@@ -329,6 +362,7 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
process_line_callback=self.process_line_callback,
+
check_job_status_callback=self.check_job_status_callback,
)
DataflowJobLink.persist(
self,
@@ -495,6 +529,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
dataflow_job_name,
pipeline_options,
process_line_callback,
+ _,
) = self._init_pipeline_options()
if not self.beam_hook:
@@ -668,6 +703,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
dataflow_job_name,
snake_case_pipeline_options,
process_line_callback,
+ _,
) = self._init_pipeline_options(format_pipeline_options=True,
job_name_variable_key="job_name")
if not self.beam_hook:
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py
b/airflow/providers/google/cloud/hooks/dataflow.py
index 1769839a0b..6c051ce619 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -1203,6 +1203,24 @@ class DataflowHook(GoogleBaseHook):
)
job_controller.wait_for_done()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def is_job_done(self, location: str, project_id: str, job_id: str) -> bool:
+ """
+ Check that Dataflow job is started(for streaming job) or finished(for
batch job).
+
+ :param location: location the job is running
+ :param project_id: Google Cloud project ID in which to start a job
+ :param job_id: Dataflow job ID
+ """
+ job_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ location=location,
+ )
+ job = job_controller.fetch_job_by_id(job_id)
+
+ return job_controller._check_dataflow_job_state(job)
+
class AsyncDataflowHook(GoogleBaseAsyncHook):
"""Async hook class for dataflow service."""
diff --git a/airflow/providers/google/cloud/operators/dataflow.py
b/airflow/providers/google/cloud/operators/dataflow.py
index 59b2715886..ec813c55eb 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -1286,6 +1286,12 @@ class DataflowStopJobOperator(GoogleCloudBaseOperator):
:param stop_timeout: wait time in seconds for successful job
canceling/draining
"""
+ template_fields = [
+ "job_id",
+ "project_id",
+ "impersonation_chain",
+ ]
+
def __init__(
self,
job_name_prefix: str | None = None,
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
index 366f14f872..3851cb8fa8 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
@@ -65,9 +65,9 @@ Starting Non-templated pipeline
To create a new pipeline using the source file (JAR in Java or Python file) use
the create job operators. The source file can be located on GCS or on the
local filesystem.
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
or
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`
.. _howto/operator:DataflowCreateJavaJobOperator:
@@ -75,7 +75,7 @@ Java SDK pipelines
""""""""""""""""""
For Java pipeline the ``jar`` argument must be specified for
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
as it contains the pipeline to be executed on Dataflow. The JAR can be
available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the
absolute path to it).
@@ -101,7 +101,7 @@ Python SDK pipelines
""""""""""""""""""""
The ``py_file`` argument must be specified for
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`
as it contains the pipeline to be executed on Dataflow. The Python file can be
available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the
absolute path to it).
@@ -129,8 +129,8 @@ Dataflow has multiple options of executing pipelines. It
can be done in the foll
batch asynchronously (fire and forget), batch blocking (wait until
completion), or streaming (run indefinitely).
In Airflow it is best practice to use asynchronous batch pipelines or streams
and use sensors to listen for expected job state.
-By default
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`,
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`,
+By default
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`,
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`,
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator`
and
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator`
have argument ``wait_until_finished`` set to ``None`` which cause different
behaviour depends on the type of pipeline:
@@ -175,6 +175,12 @@ Streaming execution
To execute a streaming Dataflow job, ensure the streaming option is set (for
Python) or read from an unbounded data
source, such as Pub/Sub, in your pipeline (for Java).
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_start_streaming_python_job]
+ :end-before: [END howto_operator_start_streaming_python_job]
+
Setting argument ``drain_pipeline`` to ``True`` allows to stop streaming job
by draining it
instead of canceling during killing task instance.
diff --git a/tests/providers/apache/beam/hooks/test_beam.py
b/tests/providers/apache/beam/hooks/test_beam.py
index 1cfe71121c..ce33b683e2 100644
--- a/tests/providers/apache/beam/hooks/test_beam.py
+++ b/tests/providers/apache/beam/hooks/test_beam.py
@@ -67,12 +67,14 @@ class TestBeamHook:
def test_start_python_pipeline(self, mock_check_output, mock_runner):
hook = BeamHook(runner=DEFAULT_RUNNER)
process_line_callback = MagicMock()
+ check_job_status_callback = MagicMock()
hook.start_python_pipeline(
variables=copy.deepcopy(BEAM_VARIABLES_PY),
py_file=PY_FILE,
py_options=PY_OPTIONS,
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
)
expected_cmd = [
@@ -88,6 +90,7 @@ class TestBeamHook:
process_line_callback=process_line_callback,
working_directory=None,
log=ANY,
+ check_job_status_callback=check_job_status_callback,
)
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output",
return_value=b"2.35.0")
@@ -108,6 +111,7 @@ class TestBeamHook:
py_requirements=None,
py_system_site_packages=False,
process_line_callback=MagicMock(),
+ check_job_status_callback=MagicMock(),
)
@pytest.mark.parametrize(
@@ -126,6 +130,7 @@ class TestBeamHook:
):
hook = BeamHook(runner=DEFAULT_RUNNER)
process_line_callback = MagicMock()
+ check_job_status_callback = MagicMock()
hook.start_python_pipeline(
variables=copy.deepcopy(BEAM_VARIABLES_PY),
@@ -133,6 +138,7 @@ class TestBeamHook:
py_options=PY_OPTIONS,
py_interpreter=py_interpreter,
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
)
expected_cmd = [
@@ -148,6 +154,7 @@ class TestBeamHook:
process_line_callback=process_line_callback,
working_directory=None,
log=ANY,
+ check_job_status_callback=check_job_status_callback,
)
@pytest.mark.parametrize(
@@ -172,6 +179,7 @@ class TestBeamHook:
hook = BeamHook(runner=DEFAULT_RUNNER)
mock_virtualenv.return_value = "/dummy_dir/bin/python"
process_line_callback = MagicMock()
+ check_job_status_callback = MagicMock()
hook.start_python_pipeline(
variables=copy.deepcopy(BEAM_VARIABLES_PY),
@@ -180,6 +188,7 @@ class TestBeamHook:
py_requirements=current_py_requirements,
py_system_site_packages=current_py_system_site_packages,
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
)
expected_cmd = [
@@ -193,6 +202,7 @@ class TestBeamHook:
mock_runner.assert_called_once_with(
cmd=expected_cmd,
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
working_directory=None,
log=ANY,
)
@@ -211,6 +221,7 @@ class TestBeamHook:
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
+ check_job_status_callback = MagicMock()
with pytest.raises(AirflowException, match=r"Invalid method
invocation\."):
hook.start_python_pipeline(
@@ -219,6 +230,7 @@ class TestBeamHook:
py_options=PY_OPTIONS,
py_requirements=[],
process_line_callback=process_line_callback,
+ check_job_status_callback=check_job_status_callback,
)
mock_runner.assert_not_called()
@@ -244,7 +256,11 @@ class TestBeamHook:
'--labels={"foo":"bar"}',
]
mock_runner.assert_called_once_with(
- cmd=expected_cmd, process_line_callback=process_line_callback,
working_directory=None, log=ANY
+ cmd=expected_cmd,
+ process_line_callback=process_line_callback,
+ working_directory=None,
+ log=ANY,
+ check_job_status_callback=None,
)
@mock.patch(BEAM_STRING.format("run_beam_command"))
@@ -273,6 +289,7 @@ class TestBeamHook:
process_line_callback=process_line_callback,
working_directory=None,
log=ANY,
+ check_job_status_callback=None,
)
@mock.patch(BEAM_STRING.format("shutil.which"))
@@ -303,6 +320,7 @@ class TestBeamHook:
process_line_callback=process_line_callback,
working_directory=go_workspace,
log=ANY,
+ check_job_status_callback=None,
)
@mock.patch(BEAM_STRING.format("shutil.which"))
@@ -348,6 +366,7 @@ class TestBeamHook:
process_line_callback=process_line_callback,
working_directory=None,
log=ANY,
+ check_job_status_callback=None,
)
diff --git a/tests/providers/apache/beam/operators/test_beam.py
b/tests/providers/apache/beam/operators/test_beam.py
index d67b3ff647..8b6f57cccc 100644
--- a/tests/providers/apache/beam/operators/test_beam.py
+++ b/tests/providers/apache/beam/operators/test_beam.py
@@ -160,6 +160,7 @@ class TestBeamRunPythonPipelineOperator:
py_requirements=None,
py_system_site_packages=False,
process_line_callback=mock.ANY,
+ check_job_status_callback=mock.ANY,
)
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
diff --git
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
new file mode 100644
index 0000000000..0489117e5d
--- /dev/null
+++
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with
Python for Streaming job.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
+from airflow.providers.apache.beam.operators.beam import
BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import
DataflowStopJobOperator
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.pubsub import (
+ PubSubCreateTopicOperator,
+ PubSubDeleteTopicOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "dataflow_native_python_streaming"
+
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
+GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
+GCS_PYTHON_SCRIPT =
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/streaming_wordcount.py"
+LOCATION = "europe-west3"
+TOPIC_ID = f"topic-{DAG_ID}"
+
+default_args = {
+ "dataflow_default_options": {
+ "tempLocation": GCS_TMP,
+ "stagingLocation": GCS_STAGING,
+ }
+}
+
+with DAG(
+ DAG_ID,
+ default_args=default_args,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "dataflow"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
+
+ create_pub_sub_topic = PubSubCreateTopicOperator(
+ task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID,
fail_if_exists=False
+ )
+
+ # [START howto_operator_start_streaming_python_job]
+ start_streaming_python_job = BeamRunPythonPipelineOperator(
+ runner=BeamRunnerType.DataflowRunner,
+ task_id="start_streaming_python_job",
+ py_file=GCS_PYTHON_SCRIPT,
+ py_options=[],
+ pipeline_options={
+ "temp_location": GCS_TMP,
+ "input_topic":
"projects/pubsub-public-data/topics/taxirides-realtime",
+ "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
+ "streaming": True,
+ },
+ py_requirements=["apache-beam[gcp]==2.46.0"],
+ py_interpreter="python3",
+ py_system_site_packages=False,
+ dataflow_config={"location": LOCATION},
+ )
+ # [END howto_operator_start_streaming_python_job]
+
+ stop_dataflow_job = DataflowStopJobOperator(
+ task_id="stop_dataflow_job",
+ location=LOCATION,
+ job_id="{{
task_instance.xcom_pull(task_ids='start_streaming_python_job')['dataflow_job_id']
}}",
+ )
+
+ delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic",
topic=TOPIC_ID, project_id=PROJECT_ID)
+ delete_topic.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> create_pub_sub_topic
+ # TEST BODY
+ >> start_streaming_python_job
+ # TEST TEARDOWN
+ >> stop_dataflow_job
+ >> delete_topic
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)