This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit eef315fff7be4e132b47fbc521554d9f6861120f Author: Zion <sionamsa...@gmail.com> AuthorDate: Fri Feb 23 21:04:36 2024 +0200 fix ImportError on examples dags (#37571) * fix(main): fix ImportError on examples dags * thanks @eladkal * @Taragolis thanks for the review, you absoultly right. bad styling, overthinking. * Fix static check failure * Update tests/system/README.md * Update dev/PROJECT_GUIDELINES.md --------- Co-authored-by: Elad Kalif <45845474+elad...@users.noreply.github.com> (cherry picked from commit e0fc8034dc541eedcad6f2b1eacb6e9cd312be6e) --- airflow/example_dags/example_branch_operator.py | 217 +++++++++++---------- .../example_branch_operator_decorator.py | 155 ++++++++------- 2 files changed, 190 insertions(+), 182 deletions(-) diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 594c6a4cb1..806652d9ba 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -28,140 +28,143 @@ from pathlib import Path import pendulum -from airflow.models.dag import DAG -from airflow.operators.empty import EmptyOperator -from airflow.operators.python import ( - BranchExternalPythonOperator, - BranchPythonOperator, - BranchPythonVirtualenvOperator, - ExternalPythonOperator, - PythonOperator, - PythonVirtualenvOperator, -) -from airflow.utils.edgemodifier import Label -from airflow.utils.trigger_rule import TriggerRule - -PATH_TO_PYTHON_BINARY = sys.executable - -with DAG( - dag_id="example_branch_operator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", -) as dag: - run_this_first = EmptyOperator( - task_id="run_this_first", +from airflow.operators.python import is_venv_installed + +if is_venv_installed(): + from airflow.models.dag import DAG + from airflow.operators.empty import EmptyOperator + from airflow.operators.python import ( + BranchExternalPythonOperator, + BranchPythonOperator, + BranchPythonVirtualenvOperator, + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, ) + from airflow.utils.edgemodifier import Label + from airflow.utils.trigger_rule import TriggerRule + + PATH_TO_PYTHON_BINARY = sys.executable + + with DAG( + dag_id="example_branch_operator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", + ) as dag: + run_this_first = EmptyOperator( + task_id="run_this_first", + ) - options = ["a", "b", "c", "d"] - - # Example branching on standard Python tasks + options = ["a", "b", "c", "d"] - # [START howto_operator_branch_python] - branching = BranchPythonOperator( - task_id="branching", - python_callable=lambda: f"branch_{random.choice(options)}", - ) - # [END howto_operator_branch_python] - run_this_first >> branching - - join = EmptyOperator( - task_id="join", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - ) + # Example branching on standard Python tasks - for option in options: - t = PythonOperator( - task_id=f"branch_{option}", - python_callable=lambda: print("Hello World"), + # [START howto_operator_branch_python] + branching = BranchPythonOperator( + task_id="branching", + python_callable=lambda: f"branch_{random.choice(options)}", ) + # [END howto_operator_branch_python] + run_this_first >> branching - empty_follow = EmptyOperator( - task_id="follow_" + option, + join = EmptyOperator( + task_id="join", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, ) - # Label is optional here, but it can help identify more complex branches - branching >> Label(option) >> t >> empty_follow >> join + for option in options: + t = PythonOperator( + task_id=f"branch_{option}", + python_callable=lambda: print("Hello World"), + ) - # Example the same with external Python calls + empty_follow = EmptyOperator( + task_id="follow_" + option, + ) - # [START howto_operator_branch_ext_py] - def branch_with_external_python(choices): - import random + # Label is optional here, but it can help identify more complex branches + branching >> Label(option) >> t >> empty_follow >> join - return f"ext_py_{random.choice(choices)}" + # Example the same with external Python calls - branching_ext_py = BranchExternalPythonOperator( - task_id="branching_ext_python", - python=PATH_TO_PYTHON_BINARY, - python_callable=branch_with_external_python, - op_args=[options], - ) - # [END howto_operator_branch_ext_py] - join >> branching_ext_py - - join_ext_py = EmptyOperator( - task_id="join_ext_python", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - ) + # [START howto_operator_branch_ext_py] + def branch_with_external_python(choices): + import random - def hello_world_with_external_python(): - print("Hello World from external Python") + return f"ext_py_{random.choice(choices)}" - for option in options: - t = ExternalPythonOperator( - task_id=f"ext_py_{option}", + branching_ext_py = BranchExternalPythonOperator( + task_id="branching_ext_python", python=PATH_TO_PYTHON_BINARY, - python_callable=hello_world_with_external_python, + python_callable=branch_with_external_python, + op_args=[options], ) + # [END howto_operator_branch_ext_py] + join >> branching_ext_py - # Label is optional here, but it can help identify more complex branches - branching_ext_py >> Label(option) >> t >> join_ext_py - - # Example the same with Python virtual environments + join_ext_py = EmptyOperator( + task_id="join_ext_python", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = Path(tempfile.gettempdir()) + def hello_world_with_external_python(): + print("Hello World from external Python") - def branch_with_venv(choices): - import random + for option in options: + t = ExternalPythonOperator( + task_id=f"ext_py_{option}", + python=PATH_TO_PYTHON_BINARY, + python_callable=hello_world_with_external_python, + ) - import numpy as np + # Label is optional here, but it can help identify more complex branches + branching_ext_py >> Label(option) >> t >> join_ext_py - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + # Example the same with Python virtual environments - branching_venv = BranchPythonVirtualenvOperator( - task_id="branching_venv", - requirements=["numpy~=1.24.4"], - venv_cache_path=VENV_CACHE_PATH, - python_callable=branch_with_venv, - op_args=[options], - ) - # [END howto_operator_branch_virtualenv] - join_ext_py >> branching_venv + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = Path(tempfile.gettempdir()) - join_venv = EmptyOperator( - task_id="join_venv", - trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, - ) + def branch_with_venv(choices): + import random - def hello_world_with_venv(): - import numpy as np + import numpy as np - print(f"Hello World with some numpy stuff: {np.arange(6)}") + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - for option in options: - t = PythonVirtualenvOperator( - task_id=f"venv_{option}", + branching_venv = BranchPythonVirtualenvOperator( + task_id="branching_venv", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH, - python_callable=hello_world_with_venv, + python_callable=branch_with_venv, + op_args=[options], + ) + # [END howto_operator_branch_virtualenv] + join_ext_py >> branching_venv + + join_venv = EmptyOperator( + task_id="join_venv", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, ) - # Label is optional here, but it can help identify more complex branches - branching_venv >> Label(option) >> t >> join_venv + def hello_world_with_venv(): + import numpy as np + + print(f"Hello World with some numpy stuff: {np.arange(6)}") + + for option in options: + t = PythonVirtualenvOperator( + task_id=f"venv_{option}", + requirements=["numpy~=1.24.4"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=hello_world_with_venv, + ) + + # Label is optional here, but it can help identify more complex branches + branching_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index 5d42ff6b27..7b9486157d 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -29,116 +29,121 @@ import tempfile import pendulum -from airflow.decorators import task -from airflow.models.dag import DAG -from airflow.operators.empty import EmptyOperator -from airflow.utils.edgemodifier import Label -from airflow.utils.trigger_rule import TriggerRule +from airflow.operators.python import is_venv_installed -PATH_TO_PYTHON_BINARY = sys.executable +if is_venv_installed(): + from airflow.decorators import task + from airflow.models.dag import DAG + from airflow.operators.empty import EmptyOperator + from airflow.utils.edgemodifier import Label + from airflow.utils.trigger_rule import TriggerRule -with DAG( - dag_id="example_branch_python_operator_decorator", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - schedule="@daily", - tags=["example", "example2"], - orientation="TB", -) as dag: - run_this_first = EmptyOperator(task_id="run_this_first") + PATH_TO_PYTHON_BINARY = sys.executable - options = ["a", "b", "c", "d"] + with DAG( + dag_id="example_branch_python_operator_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule="@daily", + tags=["example", "example2"], + orientation="TB", + ) as dag: + run_this_first = EmptyOperator(task_id="run_this_first") - # Example branching on standard Python tasks + options = ["a", "b", "c", "d"] - # [START howto_operator_branch_python] - @task.branch() - def branching(choices: list[str]) -> str: - return f"branch_{random.choice(choices)}" + # Example branching on standard Python tasks - # [END howto_operator_branch_python] + # [START howto_operator_branch_python] + @task.branch() + def branching(choices: list[str]) -> str: + return f"branch_{random.choice(choices)}" - random_choice_instance = branching(choices=options) + # [END howto_operator_branch_python] - run_this_first >> random_choice_instance + random_choice_instance = branching(choices=options) - join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + run_this_first >> random_choice_instance - for option in options: + join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - @task(task_id=f"branch_{option}") - def some_task(): - print("doing something in Python") + for option in options: - t = some_task() - empty = EmptyOperator(task_id=f"follow_{option}") + @task(task_id=f"branch_{option}") + def some_task(): + print("doing something in Python") - # Label is optional here, but it can help identify more complex branches - random_choice_instance >> Label(option) >> t >> empty >> join + t = some_task() + empty = EmptyOperator(task_id=f"follow_{option}") - # Example the same with external Python calls + # Label is optional here, but it can help identify more complex branches + random_choice_instance >> Label(option) >> t >> empty >> join - # [START howto_operator_branch_ext_py] - @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) - def branching_ext_python(choices) -> str: - import random + # Example the same with external Python calls - return f"ext_py_{random.choice(choices)}" + # [START howto_operator_branch_ext_py] + @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) + def branching_ext_python(choices) -> str: + import random - # [END howto_operator_branch_ext_py] + return f"ext_py_{random.choice(choices)}" - random_choice_ext_py = branching_ext_python(choices=options) + # [END howto_operator_branch_ext_py] - join >> random_choice_ext_py + random_choice_ext_py = branching_ext_python(choices=options) - join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + join >> random_choice_ext_py - for option in options: + join_ext_py = EmptyOperator( + task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS + ) - @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) - def some_ext_py_task(): - print("doing something in external Python") + for option in options: - t = some_ext_py_task() + @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) + def some_ext_py_task(): + print("doing something in external Python") - # Label is optional here, but it can help identify more complex branches - random_choice_ext_py >> Label(option) >> t >> join_ext_py + t = some_ext_py_task() - # Example the same with Python virtual environments + # Label is optional here, but it can help identify more complex branches + random_choice_ext_py >> Label(option) >> t >> join_ext_py - # [START howto_operator_branch_virtualenv] - # Note: Passing a caching dir allows to keep the virtual environment over multiple runs - # Run the example a second time and see that it re-uses it and is faster. - VENV_CACHE_PATH = tempfile.gettempdir() + # Example the same with Python virtual environments - @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) - def branching_virtualenv(choices) -> str: - import random + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = tempfile.gettempdir() - import numpy as np + @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) + def branching_virtualenv(choices) -> str: + import random - print(f"Some numpy stuff: {np.arange(6)}") - return f"venv_{random.choice(choices)}" + import numpy as np - # [END howto_operator_branch_virtualenv] + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" - random_choice_venv = branching_virtualenv(choices=options) + # [END howto_operator_branch_virtualenv] - join_ext_py >> random_choice_venv + random_choice_venv = branching_virtualenv(choices=options) - join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + join_ext_py >> random_choice_venv - for option in options: + join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) - @task.virtualenv( - task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH - ) - def some_venv_task(): - import numpy as np + for option in options: - print(f"Some numpy stuff: {np.arange(6)}") + @task.virtualenv( + task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH + ) + def some_venv_task(): + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") - t = some_venv_task() + t = some_venv_task() - # Label is optional here, but it can help identify more complex branches - random_choice_venv >> Label(option) >> t >> join_venv + # Label is optional here, but it can help identify more complex branches + random_choice_venv >> Label(option) >> t >> join_venv