This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 44b4a3755d Add deferrable mode to `BeamRunPythonPipelineOperator`
(#31471)
44b4a3755d is described below
commit 44b4a3755deea23f8ff0eb1316db880b1d64812c
Author: VladaZakharova <[email protected]>
AuthorDate: Thu Jul 13 18:37:11 2023 +0200
Add deferrable mode to `BeamRunPythonPipelineOperator` (#31471)
* Add deferrable mode to BeamRunPythonPipelineOperator
* Fix docs
* Fix deferrable parameter in init
* fix system test
---
airflow/providers/apache/beam/hooks/beam.py | 190 ++++++++++++++++++++-
airflow/providers/apache/beam/operators/beam.py | 113 +++++++++---
airflow/providers/apache/beam/provider.yaml | 5 +
airflow/providers/apache/beam/triggers/__init__.py | 16 ++
airflow/providers/apache/beam/triggers/beam.py | 116 +++++++++++++
.../operators.rst | 29 ++++
tests/providers/apache/beam/hooks/test_beam.py | 177 ++++++++++++++++++-
tests/providers/apache/beam/operators/test_beam.py | 155 +++++++++++++++--
tests/providers/apache/beam/triggers/__init__.py | 16 ++
tests/providers/apache/beam/triggers/test_beam.py | 107 ++++++++++++
.../system/providers/apache/beam/example_python.py | 11 +-
.../{example_python.py => example_python_async.py} | 34 ++--
12 files changed, 911 insertions(+), 58 deletions(-)
diff --git a/airflow/providers/apache/beam/hooks/beam.py
b/airflow/providers/apache/beam/hooks/beam.py
index 264d9076e0..d92ade2050 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -18,8 +18,10 @@
"""This module contains a Apache Beam Hook."""
from __future__ import annotations
+import asyncio
import contextlib
import copy
+import functools
import json
import logging
import os
@@ -27,8 +29,8 @@ import select
import shlex
import shutil
import subprocess
+import tempfile
import textwrap
-from tempfile import TemporaryDirectory
from typing import Callable
from packaging.version import Version
@@ -222,7 +224,7 @@ class BeamHook(BaseHook):
If a value is passed to this parameter, a new virtual environment
has been created with
additional packages installed.
- You could also install the apache-beam package if it is not
installed on your system or you want
+ You could also install the apache-beam package if it is not
installed on your system, or you want
to use a different version.
:param py_system_site_packages: Whether to include
system_site_packages in your virtualenv.
See virtualenv documentation for more information.
@@ -251,7 +253,7 @@ class BeamHook(BaseHook):
"""
)
raise AirflowException(warning_invalid_environment)
- tmp_dir =
exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv"))
+ tmp_dir =
exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-venv"))
py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=py_interpreter,
@@ -381,3 +383,185 @@ class BeamHook(BaseHook):
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)
+
+
+class BeamAsyncHook(BeamHook):
+ """
+ Asynchronous hook for Apache Beam.
+ :param runner: Runner type.
+ """
+
+ def __init__(
+ self,
+ runner: str,
+ ) -> None:
+ self.runner = runner
+ super().__init__(runner=self.runner)
+
+ @staticmethod
+ async def _create_tmp_dir(prefix: str) -> str:
+ """Helper method to create temporary directory."""
+ # Creating separate thread to create temporary directory
+ loop = asyncio.get_running_loop()
+ partial_func = functools.partial(tempfile.mkdtemp, prefix=prefix)
+ tmp_dir = await loop.run_in_executor(None, partial_func)
+ return tmp_dir
+
+ @staticmethod
+ async def _cleanup_tmp_dir(tmp_dir: str) -> None:
+ """
+ Helper method to delete temporary directory after finishing work with
it.
+ Is uses `rmtree` method to recursively remove the temporary directory.
+ """
+ shutil.rmtree(tmp_dir)
+
+ async def start_python_pipeline_async(
+ self,
+ variables: dict,
+ py_file: str,
+ py_options: list[str] | None = None,
+ py_interpreter: str = "python3",
+ py_requirements: list[str] | None = None,
+ py_system_site_packages: bool = False,
+ ):
+ """
+ Starts Apache Beam python pipeline.
+
+ :param variables: Variables passed to the pipeline.
+ :param py_file: Path to the python file to execute.
+ :param py_options: Additional options.
+ :param py_interpreter: Python version of the Apache Beam pipeline.
+ If None, this defaults to the python3.
+ To track python versions supported by beam and related
+ issues check: https://issues.apache.org/jira/browse/BEAM-1251
+ :param py_requirements: Additional python package(s) to install.
+ If a value is passed to this parameter, a new virtual environment
has been created with
+ additional packages installed.
+ You could also install the apache-beam package if it is not
installed on your system, or you want
+ to use a different version.
+ :param py_system_site_packages: Whether to include
system_site_packages in your virtualenv.
+ See virtualenv documentation for more information.
+ This option is only relevant if the ``py_requirements`` parameter
is not None.
+ """
+ if "labels" in variables:
+ variables["labels"] = [f"{key}={value}" for key, value in
variables["labels"].items()]
+
+ # Creating temporary directory
+ tmp_dir = await self._create_tmp_dir(prefix="apache-beam-venv")
+
+ async with contextlib.AsyncExitStack() as exit_stack:
+ if py_requirements is not None:
+ if not py_requirements and not py_system_site_packages:
+ warning_invalid_environment = textwrap.dedent(
+ """\
+ Invalid method invocation. You have disabled inclusion
of system packages and empty
+ list required for installation, so it is not possible
to create a valid virtual
+ environment. In the virtual environment, apache-beam
package must be installed for
+ your job to be executed.
+
+ To fix this problem:
+ * install apache-beam on the system, then set
parameter py_system_site_packages
+ to True,
+ * add apache-beam to the list of required packages in
parameter py_requirements.
+ """
+ )
+ raise AirflowException(warning_invalid_environment)
+
+ # Pushing asynchronous callback to ensure the cleanup of the
temporary
+ # directory when the asynchronous context is exited
+ exit_stack.push_async_callback(self._cleanup_tmp_dir, tmp_dir)
+
+ py_interpreter = prepare_virtualenv(
+ venv_directory=tmp_dir,
+ python_bin=py_interpreter,
+ system_site_packages=py_system_site_packages,
+ requirements=py_requirements,
+ )
+ command_prefix: list[str] = [py_interpreter] + (py_options or [])
+ [py_file]
+ beam_version = (
+ subprocess.check_output(
+ [py_interpreter, "-c", "import apache_beam;
print(apache_beam.__version__)"]
+ )
+ .decode()
+ .strip()
+ )
+ self.log.info("Beam version: %s", beam_version)
+ impersonate_service_account =
variables.get("impersonate_service_account")
+ if impersonate_service_account:
+ if Version(beam_version) < Version("2.39.0") or True:
+ raise AirflowException(
+ "The impersonateServiceAccount option requires Apache
Beam 2.39.0 or newer."
+ )
+ return_code = await self.start_pipeline_async(
+ variables=variables,
+ command_prefix=command_prefix,
+ )
+ return return_code
+
+ async def start_pipeline_async(
+ self,
+ variables: dict,
+ command_prefix: list[str],
+ working_directory: str | None = None,
+ ) -> int:
+ cmd = command_prefix + [
+ f"--runner={self.runner}",
+ ]
+ if variables:
+ cmd.extend(beam_options_to_args(variables))
+ return await self.run_beam_command_async(
+ cmd=cmd,
+ working_directory=working_directory,
+ log=self.log,
+ )
+
+ async def run_beam_command_async(
+ self,
+ cmd: list[str],
+ log: logging.Logger,
+ working_directory: str | None = None,
+ ) -> int:
+ """
+ Function responsible for running pipeline command in subprocess.
+
+ :param cmd: Parts of the command to be run in subprocess
+ :param working_directory: Working directory
+ :param log: logger.
+ """
+ cmd_str_representation = " ".join(shlex.quote(c) for c in cmd)
+ log.info("Running command: %s", cmd_str_representation)
+
+ # Creating a separate asynchronous process
+ process = await asyncio.create_subprocess_shell(
+ cmd_str_representation,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True,
+ cwd=working_directory,
+ )
+ # Waits for Apache Beam pipeline to complete.
+ log.info("Start waiting for Apache Beam process to complete.")
+
+ # Creating separate threads for stdout and stderr
+ stdout_task = asyncio.create_task(self.read_logs(process.stdout))
+ stderr_task = asyncio.create_task(self.read_logs(process.stderr))
+
+ # Waiting for the both tasks to complete
+ await asyncio.gather(stdout_task, stderr_task)
+
+ # Wait for the process to complete and return return_code
+ return_code = await process.wait()
+ log.info("Process exited with return code: %s", return_code)
+
+ if return_code != 0:
+ raise AirflowException(f"Apache Beam process failed with return
code {return_code}")
+ return return_code
+
+ async def read_logs(self, stream_reader):
+ while True:
+ line = await stream_reader.readline()
+ if not line:
+ break
+ decoded_line = line.decode().strip()
+ self.log.info(decoded_line)
diff --git a/airflow/providers/apache/beam/operators/beam.py
b/airflow/providers/apache/beam/operators/beam.py
index 68e6972547..cc7d217abb 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -18,6 +18,8 @@
"""This module contains Apache Beam operators."""
from __future__ import annotations
+import asyncio
+import contextlib
import copy
import os
import stat
@@ -26,11 +28,13 @@ from abc import ABC, ABCMeta, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import ExitStack
from functools import partial
-from typing import TYPE_CHECKING, Callable, Sequence
+from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
from airflow import AirflowException
+from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
from airflow.providers.google.cloud.hooks.dataflow import (
DataflowHook,
process_line_and_extract_dataflow_job_id_callback,
@@ -144,7 +148,7 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
When defining labels (labels option), you can also provide a
dictionary.
:param gcp_conn_id: Optional.
The connection ID to use connecting to Google Cloud Storage if python
file is on GCS.
- :param dataflow_config: Dataflow configuration, used when runner type is
set to DataflowRunner,
+ :param dataflow_config: Dataflow's configuration, used when runner type is
set to DataflowRunner,
(optional) defaults to None.
"""
@@ -167,7 +171,7 @@ class BeamBasePipelineOperator(BaseOperator,
BeamDataflowMixin, ABC):
self.dataflow_config = DataflowConfiguration(**dataflow_config)
else:
self.dataflow_config = dataflow_config or DataflowConfiguration()
- self.beam_hook: BeamHook | None = None
+ self.beam_hook: BeamHook
self.dataflow_hook: DataflowHook | None = None
self.dataflow_job_id: str | None = None
@@ -237,8 +241,8 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
to use a different version.
:param py_system_site_packages: Whether to include system_site_packages in
your virtualenv.
See virtualenv documentation for more information.
-
This option is only relevant if the ``py_requirements`` parameter is
not None.
+ :param deferrable: Run operator in the deferrable mode: checks for the
state using asynchronous calls.
"""
template_fields: Sequence[str] = (
@@ -264,6 +268,7 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
py_system_site_packages: bool = False,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
**kwargs,
) -> None:
super().__init__(
@@ -283,35 +288,42 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+",
"-")}
)
+ self.deferrable = deferrable
def execute(self, context: Context):
"""Execute the Apache Beam Pipeline."""
(
- is_dataflow,
- dataflow_job_name,
- snake_case_pipeline_options,
- process_line_callback,
+ self.is_dataflow,
+ self.dataflow_job_name,
+ self.snake_case_pipeline_options,
+ self.process_line_callback,
) = self._init_pipeline_options(format_pipeline_options=True,
job_name_variable_key="job_name")
-
if not self.beam_hook:
raise AirflowException("Beam hook is not defined.")
+ # Check deferrable parameter passed to the operator
+ # to determine type of run - asynchronous or synchronous
+ if self.deferrable:
+ asyncio.run(self.execute_async(context))
+ else:
+ return self.execute_sync(context)
+ def execute_sync(self, context: Context):
with ExitStack() as exit_stack:
if self.py_file.lower().startswith("gs://"):
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
tmp_gcs_file =
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
self.py_file = tmp_gcs_file.name
- if is_dataflow and self.dataflow_hook:
+ if self.is_dataflow and self.dataflow_hook:
with self.dataflow_hook.provide_authorized_gcloud():
self.beam_hook.start_python_pipeline(
- variables=snake_case_pipeline_options,
+ variables=self.snake_case_pipeline_options,
py_file=self.py_file,
py_options=self.py_options,
py_interpreter=self.py_interpreter,
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
- process_line_callback=process_line_callback,
+ process_line_callback=self.process_line_callback,
)
DataflowJobLink.persist(
self,
@@ -320,25 +332,82 @@ class
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
self.dataflow_config.location,
self.dataflow_job_id,
)
- if dataflow_job_name and self.dataflow_config.location:
- self.dataflow_hook.wait_for_done(
- job_name=dataflow_job_name,
- location=self.dataflow_config.location,
- job_id=self.dataflow_job_id,
- multiple_jobs=False,
- project_id=self.dataflow_config.project_id,
- )
return {"dataflow_job_id": self.dataflow_job_id}
else:
self.beam_hook.start_python_pipeline(
- variables=snake_case_pipeline_options,
+ variables=self.snake_case_pipeline_options,
py_file=self.py_file,
py_options=self.py_options,
py_interpreter=self.py_interpreter,
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
- process_line_callback=process_line_callback,
+ process_line_callback=self.process_line_callback,
+ )
+
+ async def execute_async(self, context: Context):
+ # Creating a new event loop to manage I/O operations asynchronously
+ loop = asyncio.get_event_loop()
+ if self.py_file.lower().startswith("gs://"):
+ gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
+ # Running synchronous `enter_context()` method in a separate
+ # thread using the default executor `None`. The
`run_in_executor()` function returns the
+ # file object, which is created using gcs function
`provide_file()`, asynchronously.
+ # This means we can perform asynchronous operations with this file.
+ create_tmp_file_call =
gcs_hook.provide_file(object_url=self.py_file)
+ tmp_gcs_file: IO[str] = await loop.run_in_executor(
+ None, contextlib.ExitStack().enter_context,
create_tmp_file_call
+ )
+ self.py_file = tmp_gcs_file.name
+
+ if self.is_dataflow and self.dataflow_hook:
+ DataflowJobLink.persist(
+ self,
+ context,
+ self.dataflow_config.project_id,
+ self.dataflow_config.location,
+ self.dataflow_job_id,
+ )
+ with self.dataflow_hook.provide_authorized_gcloud():
+ self.defer(
+ trigger=BeamPipelineTrigger(
+ variables=self.snake_case_pipeline_options,
+ py_file=self.py_file,
+ py_options=self.py_options,
+ py_interpreter=self.py_interpreter,
+ py_requirements=self.py_requirements,
+ py_system_site_packages=self.py_system_site_packages,
+ runner=self.runner,
+ ),
+ method_name="execute_complete",
)
+ else:
+ self.defer(
+ trigger=BeamPipelineTrigger(
+ variables=self.snake_case_pipeline_options,
+ py_file=self.py_file,
+ py_options=self.py_options,
+ py_interpreter=self.py_interpreter,
+ py_requirements=self.py_requirements,
+ py_system_site_packages=self.py_system_site_packages,
+ runner=self.runner,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]):
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event["status"] == "error":
+ raise AirflowException(event["message"])
+ self.log.info(
+ "%s completed with response %s ",
+ self.task_id,
+ event["message"],
+ )
+ return {"dataflow_job_id": self.dataflow_job_id}
def on_kill(self) -> None:
if self.dataflow_hook and self.dataflow_job_id:
diff --git a/airflow/providers/apache/beam/provider.yaml
b/airflow/providers/apache/beam/provider.yaml
index d177e251d7..dbc59a95af 100644
--- a/airflow/providers/apache/beam/provider.yaml
+++ b/airflow/providers/apache/beam/provider.yaml
@@ -63,6 +63,11 @@ hooks:
python-modules:
- airflow.providers.apache.beam.hooks.beam
+triggers:
+ - integration-name: Apache Beam
+ python-modules:
+ - airflow.providers.apache.beam.triggers.beam
+
additional-extras:
- name: google
dependencies:
diff --git a/airflow/providers/apache/beam/triggers/__init__.py
b/airflow/providers/apache/beam/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/apache/beam/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/beam/triggers/beam.py
b/airflow/providers/apache/beam/triggers/beam.py
new file mode 100644
index 0000000000..2926a17b45
--- /dev/null
+++ b/airflow/providers/apache/beam/triggers/beam.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, AsyncIterator
+
+from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class BeamPipelineTrigger(BaseTrigger):
+ """
+ Trigger to perform checking the pipeline status until it reaches terminate
state.
+
+ :param variables: Variables passed to the pipeline.
+ :param py_file: Path to the python file to execute.
+ :param py_options: Additional options.
+ :param py_interpreter: Python version of the Apache Beam pipeline. If
`None`, this defaults to the
+ python3. To track python versions supported by beam and related issues
+ check: https://issues.apache.org/jira/browse/BEAM-1251
+ :param py_requirements: Additional python package(s) to install.
+ If a value is passed to this parameter, a new virtual environment has
been created with
+ additional packages installed.
+
+ You could also install the apache-beam package if it is not installed
on your system, or you want
+ to use a different version.
+ :param py_system_site_packages: Whether to include system_site_packages in
your virtualenv.
+ See virtualenv documentation for more information.
+
+ This option is only relevant if the ``py_requirements`` parameter is
not None.
+ :param runner: Runner on which pipeline will be run. By default,
"DirectRunner" is being used.
+ Other possible options: DataflowRunner, SparkRunner, FlinkRunner,
PortableRunner.
+ See: :class:`~providers.apache.beam.hooks.beam.BeamRunnerType`
+ See: https://beam.apache.org/documentation/runners/capability-matrix/
+ """
+
+ def __init__(
+ self,
+ variables: dict,
+ py_file: str,
+ py_options: list[str] | None = None,
+ py_interpreter: str = "python3",
+ py_requirements: list[str] | None = None,
+ py_system_site_packages: bool = False,
+ runner: str = "DirectRunner",
+ ):
+ super().__init__()
+ self.variables = variables
+ self.py_file = py_file
+ self.py_options = py_options
+ self.py_interpreter = py_interpreter
+ self.py_requirements = py_requirements
+ self.py_system_site_packages = py_system_site_packages
+ self.runner = runner
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serializes BeamPipelineTrigger arguments and classpath."""
+ return (
+ "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger",
+ {
+ "variables": self.variables,
+ "py_file": self.py_file,
+ "py_options": self.py_options,
+ "py_interpreter": self.py_interpreter,
+ "py_requirements": self.py_requirements,
+ "py_system_site_packages": self.py_system_site_packages,
+ "runner": self.runner,
+ },
+ )
+
+ async def run(self) -> AsyncIterator[TriggerEvent]: # type:
ignore[override]
+ """Gets current pipeline status and yields a TriggerEvent."""
+ hook = self._get_async_hook()
+ while True:
+ try:
+ return_code = await hook.start_python_pipeline_async(
+ variables=self.variables,
+ py_file=self.py_file,
+ py_options=self.py_options,
+ py_interpreter=self.py_interpreter,
+ py_requirements=self.py_requirements,
+ py_system_site_packages=self.py_system_site_packages,
+ )
+ if return_code == 0:
+ yield TriggerEvent(
+ {
+ "status": "success",
+ "message": "Pipeline has finished SUCCESSFULLY",
+ }
+ )
+ return
+ else:
+ yield TriggerEvent({"status": "error", "message":
"Operation failed"})
+ return
+
+ except Exception as e:
+ self.log.exception("Exception occurred while checking for
pipeline state")
+ yield TriggerEvent({"status": "error", "message": str(e)})
+ return
+
+ def _get_async_hook(self) -> BeamAsyncHook:
+ return BeamAsyncHook(runner=self.runner)
diff --git a/docs/apache-airflow-providers-apache-beam/operators.rst
b/docs/apache-airflow-providers-apache-beam/operators.rst
index fedf6b0265..f7487fb264 100644
--- a/docs/apache-airflow-providers-apache-beam/operators.rst
+++ b/docs/apache-airflow-providers-apache-beam/operators.rst
@@ -61,6 +61,23 @@ Python Pipelines with DirectRunner
:start-after: [START
howto_operator_start_python_direct_runner_pipeline_gcs_file]
:end-before: [END
howto_operator_start_python_direct_runner_pipeline_gcs_file]
+You can use deferrable mode for this action in order to run the operator
asynchronously. It will give you a
+possibility to free up the worker when it knows it has to wait, and hand off
the job of resuming Operator to a Trigger.
+As a result, while it is suspended (deferred), it is not taking up a worker
slot and your cluster will have a
+lot less resources wasted on idle Operators or Sensors:
+
+.. exampleinclude::
/../../tests/system/providers/apache/beam/example_python_async.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_start_python_direct_runner_pipeline_local_file_async]
+ :end-before: [END
howto_operator_start_python_direct_runner_pipeline_local_file_async]
+
+.. exampleinclude::
/../../tests/system/providers/apache/beam/example_python_async.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
+ :end-before: [END
howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
+
Python Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -76,6 +93,18 @@ Python Pipelines with DataflowRunner
:start-after: [START
howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
:end-before: [END
howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
+
+You can use deferrable mode for this action in order to run the operator
asynchronously. It will give you a
+possibility to free up the worker when it knows it has to wait, and hand off
the job of resuming Operator to a Trigger.
+As a result, while it is suspended (deferred), it is not taking up a worker
slot and your cluster will have a
+lot less resources wasted on idle Operators or Sensors:
+
+.. exampleinclude::
/../../tests/system/providers/apache/beam/example_python_async.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
+ :end-before: [END
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
+
|
|
diff --git a/tests/providers/apache/beam/hooks/test_beam.py
b/tests/providers/apache/beam/hooks/test_beam.py
index 3e3e895cf9..1cfe71121c 100644
--- a/tests/providers/apache/beam/hooks/test_beam.py
+++ b/tests/providers/apache/beam/hooks/test_beam.py
@@ -21,12 +21,17 @@ import os
import re
import subprocess
from unittest import mock
-from unittest.mock import ANY, MagicMock
+from unittest.mock import ANY, AsyncMock, MagicMock
import pytest
from airflow.exceptions import AirflowException
-from airflow.providers.apache.beam.hooks.beam import BeamHook,
beam_options_to_args, run_beam_command
+from airflow.providers.apache.beam.hooks.beam import (
+ BeamAsyncHook,
+ BeamHook,
+ beam_options_to_args,
+ run_beam_command,
+)
PY_FILE = "apache_beam.examples.wordcount"
JAR_FILE = "unitest.jar"
@@ -391,3 +396,171 @@ class TestBeamOptionsToArgs:
def test_beam_options_to_args(self, options, expected_args):
args = beam_options_to_args(options)
assert args == expected_args
+
+
+class TestBeamAsyncHook:
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+ async def test_start_python_pipeline(self, mock_create_dir, mock_runner):
+ hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+ mock_create_dir.return_value = AsyncMock()
+ mock_runner.return_value = 0
+
+ await hook.start_python_pipeline_async(
+ variables=copy.deepcopy(BEAM_VARIABLES_PY),
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ )
+
+ expected_cmd = [
+ "python3",
+ "-m",
+ PY_FILE,
+ f"--runner={DEFAULT_RUNNER}",
+ "--output=gs://test/output",
+ "--labels=foo=bar",
+ ]
+ mock_create_dir.assert_called_once()
+ mock_runner.assert_called_once_with(
+ cmd=expected_cmd,
+ working_directory=None,
+ log=ANY,
+ )
+
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output",
return_value=b"2.35.0")
+ async def test_start_python_pipeline_unsupported_option(self,
mock_check_output):
+ hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+
+ with pytest.raises(
+ AirflowException,
+ match=re.escape("The impersonateServiceAccount option requires
Apache Beam 2.39.0 or newer."),
+ ):
+ await hook.start_python_pipeline_async(
+ variables={
+ "impersonate_service_account": "[email protected]",
+ },
+ py_file="/tmp/file.py",
+ py_options=["-m"],
+ py_interpreter="python3",
+ py_requirements=None,
+ py_system_site_packages=False,
+ )
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "py_interpreter",
+ [
+ pytest.param("python", id="default python"),
+ pytest.param("python2", id="major python version 2.x"),
+ pytest.param("python3", id="major python version 3.x"),
+ pytest.param("python3.6", id="major.minor python version"),
+ ],
+ )
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output",
return_value=b"2.39.0")
+ async def test_start_python_pipeline_with_custom_interpreter(
+ self, mock_check_output, mock_create_dir, mock_runner, py_interpreter
+ ):
+ hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+ mock_create_dir.return_value = AsyncMock()
+ mock_runner.return_value = 0
+
+ await hook.start_python_pipeline_async(
+ variables=copy.deepcopy(BEAM_VARIABLES_PY),
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ py_interpreter=py_interpreter,
+ )
+
+ expected_cmd = [
+ py_interpreter,
+ "-m",
+ PY_FILE,
+ f"--runner={DEFAULT_RUNNER}",
+ "--output=gs://test/output",
+ "--labels=foo=bar",
+ ]
+ mock_runner.assert_called_once_with(
+ cmd=expected_cmd,
+ working_directory=None,
+ log=ANY,
+ )
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "current_py_requirements, current_py_system_site_packages",
+ [
+ pytest.param("foo-bar", False, id="requirements without system
site-packages"),
+ pytest.param("foo-bar", True, id="requirements with system
site-packages"),
+ pytest.param([], True, id="only system site-packages"),
+ ],
+ )
+ @mock.patch(BEAM_STRING.format("prepare_virtualenv"))
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output",
return_value=b"2.39.0")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._cleanup_tmp_dir")
+ async def
test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages(
+ self,
+ mock_cleanup_dir,
+ mock_create_dir,
+ mock_check_output,
+ mock_runner,
+ mock_virtualenv,
+ current_py_requirements,
+ current_py_system_site_packages,
+ ):
+ hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+ mock_create_dir.return_value = AsyncMock()
+ mock_virtualenv.return_value = "/dummy_dir/bin/python"
+ mock_cleanup_dir.return_value = AsyncMock()
+
+ await hook.start_python_pipeline_async(
+ variables=copy.deepcopy(BEAM_VARIABLES_PY),
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ py_requirements=current_py_requirements,
+ py_system_site_packages=current_py_system_site_packages,
+ )
+
+ expected_cmd = [
+ "/dummy_dir/bin/python",
+ "-m",
+ PY_FILE,
+ f"--runner={DEFAULT_RUNNER}",
+ "--output=gs://test/output",
+ "--labels=foo=bar",
+ ]
+ mock_runner.assert_called_once_with(
+ cmd=expected_cmd,
+ working_directory=None,
+ log=ANY,
+ )
+ mock_virtualenv.assert_called_once_with(
+ venv_directory=mock.ANY,
+ python_bin="python3",
+ system_site_packages=current_py_system_site_packages,
+ requirements=current_py_requirements,
+ )
+ mock_create_dir.assert_called_once()
+
+ @pytest.mark.asyncio
+ @mock.patch(BEAM_STRING.format("run_beam_command"))
+
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output",
return_value=b"2.39.0")
+ async def
test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(
+ self, mock_check_output, mock_runner
+ ):
+ hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+
+ with pytest.raises(AirflowException, match=r"Invalid method
invocation\."):
+ await hook.start_python_pipeline_async(
+ variables=copy.deepcopy(BEAM_VARIABLES_PY),
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ py_requirements=[],
+ )
+
+ mock_runner.assert_not_called()
diff --git a/tests/providers/apache/beam/operators/test_beam.py
b/tests/providers/apache/beam/operators/test_beam.py
index 9bdf016f48..c5949dd868 100644
--- a/tests/providers/apache/beam/operators/test_beam.py
+++ b/tests/providers/apache/beam/operators/test_beam.py
@@ -22,11 +22,13 @@ from unittest.mock import MagicMock, call
import pytest
+from airflow.exceptions import AirflowException, TaskDeferred
from airflow.providers.apache.beam.operators.beam import (
BeamRunGoPipelineOperator,
BeamRunJavaPipelineOperator,
BeamRunPythonPipelineOperator,
)
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
from airflow.providers.google.cloud.operators.dataflow import
DataflowConfiguration
from airflow.version import version
@@ -150,13 +152,6 @@ class TestBeamRunPythonPipelineOperator:
py_system_site_packages=False,
process_line_callback=mock.ANY,
)
- dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with(
- job_id=self.operator.dataflow_job_id,
- job_name=job_name,
- location="us-central1",
- multiple_jobs=False,
- project_id=dataflow_config.project_id,
- )
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
@@ -268,13 +263,6 @@ class TestBeamRunJavaPipelineOperator:
job_class=JOB_CLASS,
process_line_callback=mock.ANY,
)
- dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with(
- job_id=self.operator.dataflow_job_id,
- job_name=job_name,
- location="us-central1",
- multiple_jobs=False,
- project_id=dataflow_hook_mock.return_value.project_id,
- )
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
@mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
@@ -696,3 +684,142 @@ class TestBeamRunGoPipelineOperator:
self.operator.execute(None)
self.operator.on_kill()
dataflow_cancel_job.assert_not_called()
+
+
+class TestBeamRunPythonPipelineOperatorAsync:
+ def setup_method(self):
+ self.operator = BeamRunPythonPipelineOperator(
+ task_id=TASK_ID,
+ py_file=PY_FILE,
+ py_options=PY_OPTIONS,
+ default_pipeline_options=DEFAULT_OPTIONS,
+ pipeline_options=ADDITIONAL_OPTIONS,
+ deferrable=True,
+ )
+
+ def test_init(self):
+ """Test BeamRunPythonPipelineOperator instance is properly
initialized."""
+ assert self.operator.task_id == TASK_ID
+ assert self.operator.py_file == PY_FILE
+ assert self.operator.runner == DEFAULT_RUNNER
+ assert self.operator.py_options == PY_OPTIONS
+ assert self.operator.py_interpreter == PY_INTERPRETER
+ assert self.operator.default_pipeline_options == DEFAULT_OPTIONS
+ assert self.operator.pipeline_options == EXPECTED_ADDITIONAL_OPTIONS
+
+ @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+ def test_async_execute_should_execute_successfully(self, gcs_hook,
beam_hook_mock):
+ """
+ Asserts that a task is deferred and the BeamPipelineTrigger will be
fired
+ when the BeamRunPythonPipelineOperator is executed in deferrable mode
when deferrable=True.
+ """
+ with pytest.raises(TaskDeferred) as exc:
+ self.operator.execute(context=mock.MagicMock())
+
+ assert isinstance(exc.value.trigger, BeamPipelineTrigger), "Trigger is
not a BeamPipelineTrigger"
+
+ def test_async_execute_should_throw_exception(self):
+ """Tests that an AirflowException is raised in case of error event"""
+
+ with pytest.raises(AirflowException):
+ self.operator.execute_complete(
+ context=mock.MagicMock(), event={"status": "error", "message":
"test failure message"}
+ )
+
+ def test_async_execute_logging_should_execute_successfully(self):
+ """Asserts that logging occurs as expected"""
+
+ with mock.patch.object(self.operator.log, "info") as mock_log_info:
+ self.operator.execute_complete(
+ context=mock.MagicMock(),
+ event={"status": "success", "message": "Pipeline has finished
SUCCESSFULLY"},
+ )
+ mock_log_info.assert_called_with(
+ "%s completed with response %s ", TASK_ID, "Pipeline has finished
SUCCESSFULLY"
+ )
+
+ @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+ def test_async_execute_direct_runner(self, gcs_hook, beam_hook_mock):
+ """
+ Test BeamHook is created and the right args are passed to
+ start_python_workflow when executing direct runner.
+ """
+ gcs_provide_file = gcs_hook.return_value.provide_file
+ with pytest.raises(TaskDeferred):
+ self.operator.execute(context=mock.MagicMock())
+ beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+ gcs_provide_file.assert_called_once_with(object_url=PY_FILE)
+
+
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock,
beam_hook_mock, persist_link_mock):
+ """
+ Test DataflowHook is created and the right args are passed to
+ start_python_dataflow when executing Dataflow runner.
+ """
+
+ dataflow_config =
DataflowConfiguration(impersonation_chain=TEST_IMPERSONATION_ACCOUNT)
+ self.operator.runner = "DataflowRunner"
+ self.operator.dataflow_config = dataflow_config
+ gcs_provide_file = gcs_hook.return_value.provide_file
+ magic_mock = mock.MagicMock()
+ with pytest.raises(TaskDeferred):
+ self.operator.execute(context=magic_mock)
+
+ job_name = dataflow_hook_mock.build_dataflow_job_name.return_value
+ dataflow_hook_mock.assert_called_once_with(
+ gcp_conn_id=dataflow_config.gcp_conn_id,
+ poll_sleep=dataflow_config.poll_sleep,
+ impersonation_chain=dataflow_config.impersonation_chain,
+ drain_pipeline=dataflow_config.drain_pipeline,
+ cancel_timeout=dataflow_config.cancel_timeout,
+ wait_until_finished=dataflow_config.wait_until_finished,
+ )
+ expected_options = {
+ "project": dataflow_hook_mock.return_value.project_id,
+ "job_name": job_name,
+ "staging_location": "gs://test/staging",
+ "output": "gs://test/output",
+ "labels": {"foo": "bar", "airflow-version": TEST_VERSION},
+ "region": "us-central1",
+ "impersonate_service_account": TEST_IMPERSONATION_ACCOUNT,
+ }
+ gcs_provide_file.assert_called_once_with(object_url=PY_FILE)
+ persist_link_mock.assert_called_once_with(
+ self.operator,
+ magic_mock,
+ expected_options["project"],
+ expected_options["region"],
+ self.operator.dataflow_job_id,
+ )
+ beam_hook_mock.return_value.start_python_pipeline.assert_not_called()
+
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
+
+
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+ def test_on_kill_dataflow_runner(self, dataflow_hook_mock, _, __, ___):
+ self.operator.runner = "DataflowRunner"
+ dataflow_cancel_job = dataflow_hook_mock.return_value.cancel_job
+ with pytest.raises(TaskDeferred):
+ self.operator.execute(context=mock.MagicMock())
+ self.operator.dataflow_job_id = JOB_ID
+ self.operator.on_kill()
+ dataflow_cancel_job.assert_called_once_with(
+ job_id=JOB_ID, project_id=self.operator.dataflow_config.project_id
+ )
+
+ @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+ @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+ def test_on_kill_direct_runner(self, _, dataflow_mock, __):
+ dataflow_cancel_job = dataflow_mock.return_value.cancel_job
+ with pytest.raises(TaskDeferred):
+ self.operator.execute(mock.MagicMock())
+ self.operator.on_kill()
+ dataflow_cancel_job.assert_not_called()
diff --git a/tests/providers/apache/beam/triggers/__init__.py
b/tests/providers/apache/beam/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/apache/beam/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/beam/triggers/test_beam.py
b/tests/providers/apache/beam/triggers/test_beam.py
new file mode 100644
index 0000000000..ce22dda215
--- /dev/null
+++ b/tests/providers/apache/beam/triggers/test_beam.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
+from airflow.triggers.base import TriggerEvent
+
+HOOK_STATUS_STR =
"airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.start_python_pipeline_async"
+CLASSPATH = "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger"
+
+TASK_ID = "test_task"
+LOCATION = "test-location"
+INSTANCE_NAME = "airflow-test-instance"
+INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
+PROJECT_ID = "test_project_id"
+TEST_VARIABLES = {"output": "gs://bucket_test/output", "labels":
{"airflow-version": "v2-7-0-dev0"}}
+TEST_PY_FILE = "apache_beam.examples.wordcount"
+TEST_PY_OPTIONS = []
+TEST_PY_INTERPRETER = "python3"
+TEST_PY_REQUIREMENTS = ["apache-beam[gcp]==2.46.0"]
+TEST_PY_PACKAGES = False
+TEST_RUNNER = "DirectRunner"
+
+
[email protected]
+def trigger():
+ return BeamPipelineTrigger(
+ variables=TEST_VARIABLES,
+ py_file=TEST_PY_FILE,
+ py_options=TEST_PY_OPTIONS,
+ py_interpreter=TEST_PY_INTERPRETER,
+ py_requirements=TEST_PY_REQUIREMENTS,
+ py_system_site_packages=TEST_PY_PACKAGES,
+ runner=TEST_RUNNER,
+ )
+
+
+class TestBeamPipelineTrigger:
+ def test_beam_trigger_serialization_should_execute_successfully(self,
trigger):
+ """
+ Asserts that the BeamPipelineTrigger correctly serializes its arguments
+ and classpath.
+ """
+ classpath, kwargs = trigger.serialize()
+ assert classpath == CLASSPATH
+ assert kwargs == {
+ "variables": TEST_VARIABLES,
+ "py_file": TEST_PY_FILE,
+ "py_options": TEST_PY_OPTIONS,
+ "py_interpreter": TEST_PY_INTERPRETER,
+ "py_requirements": TEST_PY_REQUIREMENTS,
+ "py_system_site_packages": TEST_PY_PACKAGES,
+ "runner": TEST_RUNNER,
+ }
+
+ @pytest.mark.asyncio
+ @mock.patch(HOOK_STATUS_STR)
+ async def test_beam_trigger_on_success_should_execute_successfully(self,
mock_pipeline_status, trigger):
+ """
+ Tests the BeamPipelineTrigger only fires once the job execution
reaches a successful state.
+ """
+ mock_pipeline_status.return_value = 0
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "success", "message": "Pipeline has
finished SUCCESSFULLY"}) == actual
+
+ @pytest.mark.asyncio
+ @mock.patch(HOOK_STATUS_STR)
+ async def test_beam_trigger_error_should_execute_successfully(self,
mock_pipeline_status, trigger):
+ """
+ Test that BeamPipelineTrigger fires the correct event in case of an
error.
+ """
+ mock_pipeline_status.return_value = 1
+
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "Operation
failed"}) == actual
+
+ @pytest.mark.asyncio
+ @mock.patch(HOOK_STATUS_STR)
+ async def test_beam_trigger_exception_should_execute_successfully(self,
mock_pipeline_status, trigger):
+ """
+ Test that BeamPipelineTrigger fires the correct event in case of an
error.
+ """
+ mock_pipeline_status.side_effect = Exception("Test exception")
+
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ assert TriggerEvent({"status": "error", "message": "Test exception"})
== actual
diff --git a/tests/system/providers/apache/beam/example_python.py
b/tests/system/providers/apache/beam/example_python.py
index 50c469c73d..929844ec54 100644
--- a/tests/system/providers/apache/beam/example_python.py
+++ b/tests/system/providers/apache/beam/example_python.py
@@ -47,7 +47,7 @@ with models.DAG(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
@@ -59,7 +59,7 @@ with models.DAG(
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
@@ -76,7 +76,7 @@ with models.DAG(
"output": GCS_OUTPUT,
},
py_options=[],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
@@ -90,7 +90,7 @@ with models.DAG(
py_file="apache_beam.examples.wordcount",
runner="SparkRunner",
py_options=["-m"],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
@@ -103,7 +103,7 @@ with models.DAG(
pipeline_options={
"output": "/tmp/start_python_pipeline_local_flink_runner",
},
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
@@ -113,6 +113,7 @@ with models.DAG(
start_python_pipeline_local_direct_runner,
start_python_pipeline_direct_runner,
]
+ >> start_python_pipeline_dataflow_runner
>> start_python_pipeline_local_flink_runner
>> start_python_pipeline_local_spark_runner
)
diff --git a/tests/system/providers/apache/beam/example_python.py
b/tests/system/providers/apache/beam/example_python_async.py
similarity index 82%
copy from tests/system/providers/apache/beam/example_python.py
copy to tests/system/providers/apache/beam/example_python_async.py
index 50c469c73d..520e8adc4c 100644
--- a/tests/system/providers/apache/beam/example_python.py
+++ b/tests/system/providers/apache/beam/example_python_async.py
@@ -34,7 +34,7 @@ from tests.system.providers.apache.beam.utils import (
)
with models.DAG(
- "example_beam_native_python",
+ dag_id="dataflow_native_python_async",
start_date=START_DATE,
schedule=None, # Override to match your needs
catchup=False,
@@ -42,30 +42,32 @@ with models.DAG(
tags=["example"],
) as dag:
- # [START howto_operator_start_python_direct_runner_pipeline_local_file]
+ # [START
howto_operator_start_python_direct_runner_pipeline_local_file_async]
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
+ deferrable=True,
)
- # [END howto_operator_start_python_direct_runner_pipeline_local_file]
+ # [END howto_operator_start_python_direct_runner_pipeline_local_file_async]
- # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
+ # [START howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
+ deferrable=True,
)
- # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
+ # [END howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
- # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+ # [START
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
@@ -76,25 +78,30 @@ with models.DAG(
"output": GCS_OUTPUT,
},
py_options=[],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID,
location="us-central1"
),
+ deferrable=True,
)
- # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+ # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
+ # [START
howto_operator_start_python_pipeline_local_runner_spark_runner_async]
start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_spark_runner",
py_file="apache_beam.examples.wordcount",
runner="SparkRunner",
py_options=["-m"],
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
+ deferrable=True,
)
+ # [END
howto_operator_start_python_pipeline_local_runner_spark_runner_async]
+ # [START
howto_operator_start_python_pipeline_local_runner_flink_runner_async]
start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_flink_runner",
py_file="apache_beam.examples.wordcount",
@@ -103,16 +110,19 @@ with models.DAG(
pipeline_options={
"output": "/tmp/start_python_pipeline_local_flink_runner",
},
- py_requirements=["apache-beam[gcp]==2.26.0"],
+ py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
+ deferrable=True,
)
+ # [END
howto_operator_start_python_pipeline_local_runner_flink_runner_async]
(
[
start_python_pipeline_local_direct_runner,
start_python_pipeline_direct_runner,
]
+ >> start_python_pipeline_dataflow_runner
>> start_python_pipeline_local_flink_runner
>> start_python_pipeline_local_spark_runner
)