This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eef315fff7be4e132b47fbc521554d9f6861120f
Author: Zion <sionamsa...@gmail.com>
AuthorDate: Fri Feb 23 21:04:36 2024 +0200

    fix ImportError on examples dags (#37571)
    
    * fix(main): fix ImportError on examples dags
    
    * thanks @eladkal
    
    * @Taragolis thanks for the review, you absoultly right. bad styling, 
overthinking.
    
    * Fix static check failure
    
    * Update tests/system/README.md
    
    * Update dev/PROJECT_GUIDELINES.md
    
    ---------
    
    Co-authored-by: Elad Kalif <45845474+elad...@users.noreply.github.com>
    (cherry picked from commit e0fc8034dc541eedcad6f2b1eacb6e9cd312be6e)
---
 airflow/example_dags/example_branch_operator.py    | 217 +++++++++++----------
 .../example_branch_operator_decorator.py           | 155 ++++++++-------
 2 files changed, 190 insertions(+), 182 deletions(-)

diff --git a/airflow/example_dags/example_branch_operator.py 
b/airflow/example_dags/example_branch_operator.py
index 594c6a4cb1..806652d9ba 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -28,140 +28,143 @@ from pathlib import Path
 
 import pendulum
 
-from airflow.models.dag import DAG
-from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import (
-    BranchExternalPythonOperator,
-    BranchPythonOperator,
-    BranchPythonVirtualenvOperator,
-    ExternalPythonOperator,
-    PythonOperator,
-    PythonVirtualenvOperator,
-)
-from airflow.utils.edgemodifier import Label
-from airflow.utils.trigger_rule import TriggerRule
-
-PATH_TO_PYTHON_BINARY = sys.executable
-
-with DAG(
-    dag_id="example_branch_operator",
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    schedule="@daily",
-    tags=["example", "example2"],
-    orientation="TB",
-) as dag:
-    run_this_first = EmptyOperator(
-        task_id="run_this_first",
+from airflow.operators.python import is_venv_installed
+
+if is_venv_installed():
+    from airflow.models.dag import DAG
+    from airflow.operators.empty import EmptyOperator
+    from airflow.operators.python import (
+        BranchExternalPythonOperator,
+        BranchPythonOperator,
+        BranchPythonVirtualenvOperator,
+        ExternalPythonOperator,
+        PythonOperator,
+        PythonVirtualenvOperator,
     )
+    from airflow.utils.edgemodifier import Label
+    from airflow.utils.trigger_rule import TriggerRule
+
+    PATH_TO_PYTHON_BINARY = sys.executable
+
+    with DAG(
+        dag_id="example_branch_operator",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        catchup=False,
+        schedule="@daily",
+        tags=["example", "example2"],
+        orientation="TB",
+    ) as dag:
+        run_this_first = EmptyOperator(
+            task_id="run_this_first",
+        )
 
-    options = ["a", "b", "c", "d"]
-
-    # Example branching on standard Python tasks
+        options = ["a", "b", "c", "d"]
 
-    # [START howto_operator_branch_python]
-    branching = BranchPythonOperator(
-        task_id="branching",
-        python_callable=lambda: f"branch_{random.choice(options)}",
-    )
-    # [END howto_operator_branch_python]
-    run_this_first >> branching
-
-    join = EmptyOperator(
-        task_id="join",
-        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
-    )
+        # Example branching on standard Python tasks
 
-    for option in options:
-        t = PythonOperator(
-            task_id=f"branch_{option}",
-            python_callable=lambda: print("Hello World"),
+        # [START howto_operator_branch_python]
+        branching = BranchPythonOperator(
+            task_id="branching",
+            python_callable=lambda: f"branch_{random.choice(options)}",
         )
+        # [END howto_operator_branch_python]
+        run_this_first >> branching
 
-        empty_follow = EmptyOperator(
-            task_id="follow_" + option,
+        join = EmptyOperator(
+            task_id="join",
+            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
         )
 
-        # Label is optional here, but it can help identify more complex 
branches
-        branching >> Label(option) >> t >> empty_follow >> join
+        for option in options:
+            t = PythonOperator(
+                task_id=f"branch_{option}",
+                python_callable=lambda: print("Hello World"),
+            )
 
-    # Example the same with external Python calls
+            empty_follow = EmptyOperator(
+                task_id="follow_" + option,
+            )
 
-    # [START howto_operator_branch_ext_py]
-    def branch_with_external_python(choices):
-        import random
+            # Label is optional here, but it can help identify more complex 
branches
+            branching >> Label(option) >> t >> empty_follow >> join
 
-        return f"ext_py_{random.choice(choices)}"
+        # Example the same with external Python calls
 
-    branching_ext_py = BranchExternalPythonOperator(
-        task_id="branching_ext_python",
-        python=PATH_TO_PYTHON_BINARY,
-        python_callable=branch_with_external_python,
-        op_args=[options],
-    )
-    # [END howto_operator_branch_ext_py]
-    join >> branching_ext_py
-
-    join_ext_py = EmptyOperator(
-        task_id="join_ext_python",
-        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
-    )
+        # [START howto_operator_branch_ext_py]
+        def branch_with_external_python(choices):
+            import random
 
-    def hello_world_with_external_python():
-        print("Hello World from external Python")
+            return f"ext_py_{random.choice(choices)}"
 
-    for option in options:
-        t = ExternalPythonOperator(
-            task_id=f"ext_py_{option}",
+        branching_ext_py = BranchExternalPythonOperator(
+            task_id="branching_ext_python",
             python=PATH_TO_PYTHON_BINARY,
-            python_callable=hello_world_with_external_python,
+            python_callable=branch_with_external_python,
+            op_args=[options],
         )
+        # [END howto_operator_branch_ext_py]
+        join >> branching_ext_py
 
-        # Label is optional here, but it can help identify more complex 
branches
-        branching_ext_py >> Label(option) >> t >> join_ext_py
-
-    # Example the same with Python virtual environments
+        join_ext_py = EmptyOperator(
+            task_id="join_ext_python",
+            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
+        )
 
-    # [START howto_operator_branch_virtualenv]
-    # Note: Passing a caching dir allows to keep the virtual environment over 
multiple runs
-    #       Run the example a second time and see that it re-uses it and is 
faster.
-    VENV_CACHE_PATH = Path(tempfile.gettempdir())
+        def hello_world_with_external_python():
+            print("Hello World from external Python")
 
-    def branch_with_venv(choices):
-        import random
+        for option in options:
+            t = ExternalPythonOperator(
+                task_id=f"ext_py_{option}",
+                python=PATH_TO_PYTHON_BINARY,
+                python_callable=hello_world_with_external_python,
+            )
 
-        import numpy as np
+            # Label is optional here, but it can help identify more complex 
branches
+            branching_ext_py >> Label(option) >> t >> join_ext_py
 
-        print(f"Some numpy stuff: {np.arange(6)}")
-        return f"venv_{random.choice(choices)}"
+        # Example the same with Python virtual environments
 
-    branching_venv = BranchPythonVirtualenvOperator(
-        task_id="branching_venv",
-        requirements=["numpy~=1.24.4"],
-        venv_cache_path=VENV_CACHE_PATH,
-        python_callable=branch_with_venv,
-        op_args=[options],
-    )
-    # [END howto_operator_branch_virtualenv]
-    join_ext_py >> branching_venv
+        # [START howto_operator_branch_virtualenv]
+        # Note: Passing a caching dir allows to keep the virtual environment 
over multiple runs
+        #       Run the example a second time and see that it re-uses it and 
is faster.
+        VENV_CACHE_PATH = Path(tempfile.gettempdir())
 
-    join_venv = EmptyOperator(
-        task_id="join_venv",
-        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
-    )
+        def branch_with_venv(choices):
+            import random
 
-    def hello_world_with_venv():
-        import numpy as np
+            import numpy as np
 
-        print(f"Hello World with some numpy stuff: {np.arange(6)}")
+            print(f"Some numpy stuff: {np.arange(6)}")
+            return f"venv_{random.choice(choices)}"
 
-    for option in options:
-        t = PythonVirtualenvOperator(
-            task_id=f"venv_{option}",
+        branching_venv = BranchPythonVirtualenvOperator(
+            task_id="branching_venv",
             requirements=["numpy~=1.24.4"],
             venv_cache_path=VENV_CACHE_PATH,
-            python_callable=hello_world_with_venv,
+            python_callable=branch_with_venv,
+            op_args=[options],
+        )
+        # [END howto_operator_branch_virtualenv]
+        join_ext_py >> branching_venv
+
+        join_venv = EmptyOperator(
+            task_id="join_venv",
+            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
         )
 
-        # Label is optional here, but it can help identify more complex 
branches
-        branching_venv >> Label(option) >> t >> join_venv
+        def hello_world_with_venv():
+            import numpy as np
+
+            print(f"Hello World with some numpy stuff: {np.arange(6)}")
+
+        for option in options:
+            t = PythonVirtualenvOperator(
+                task_id=f"venv_{option}",
+                requirements=["numpy~=1.24.4"],
+                venv_cache_path=VENV_CACHE_PATH,
+                python_callable=hello_world_with_venv,
+            )
+
+            # Label is optional here, but it can help identify more complex 
branches
+            branching_venv >> Label(option) >> t >> join_venv
diff --git a/airflow/example_dags/example_branch_operator_decorator.py 
b/airflow/example_dags/example_branch_operator_decorator.py
index 5d42ff6b27..7b9486157d 100644
--- a/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow/example_dags/example_branch_operator_decorator.py
@@ -29,116 +29,121 @@ import tempfile
 
 import pendulum
 
-from airflow.decorators import task
-from airflow.models.dag import DAG
-from airflow.operators.empty import EmptyOperator
-from airflow.utils.edgemodifier import Label
-from airflow.utils.trigger_rule import TriggerRule
+from airflow.operators.python import is_venv_installed
 
-PATH_TO_PYTHON_BINARY = sys.executable
+if is_venv_installed():
+    from airflow.decorators import task
+    from airflow.models.dag import DAG
+    from airflow.operators.empty import EmptyOperator
+    from airflow.utils.edgemodifier import Label
+    from airflow.utils.trigger_rule import TriggerRule
 
-with DAG(
-    dag_id="example_branch_python_operator_decorator",
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    schedule="@daily",
-    tags=["example", "example2"],
-    orientation="TB",
-) as dag:
-    run_this_first = EmptyOperator(task_id="run_this_first")
+    PATH_TO_PYTHON_BINARY = sys.executable
 
-    options = ["a", "b", "c", "d"]
+    with DAG(
+        dag_id="example_branch_python_operator_decorator",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        catchup=False,
+        schedule="@daily",
+        tags=["example", "example2"],
+        orientation="TB",
+    ) as dag:
+        run_this_first = EmptyOperator(task_id="run_this_first")
 
-    # Example branching on standard Python tasks
+        options = ["a", "b", "c", "d"]
 
-    # [START howto_operator_branch_python]
-    @task.branch()
-    def branching(choices: list[str]) -> str:
-        return f"branch_{random.choice(choices)}"
+        # Example branching on standard Python tasks
 
-    # [END howto_operator_branch_python]
+        # [START howto_operator_branch_python]
+        @task.branch()
+        def branching(choices: list[str]) -> str:
+            return f"branch_{random.choice(choices)}"
 
-    random_choice_instance = branching(choices=options)
+        # [END howto_operator_branch_python]
 
-    run_this_first >> random_choice_instance
+        random_choice_instance = branching(choices=options)
 
-    join = EmptyOperator(task_id="join", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+        run_this_first >> random_choice_instance
 
-    for option in options:
+        join = EmptyOperator(task_id="join", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
 
-        @task(task_id=f"branch_{option}")
-        def some_task():
-            print("doing something in Python")
+        for option in options:
 
-        t = some_task()
-        empty = EmptyOperator(task_id=f"follow_{option}")
+            @task(task_id=f"branch_{option}")
+            def some_task():
+                print("doing something in Python")
 
-        # Label is optional here, but it can help identify more complex 
branches
-        random_choice_instance >> Label(option) >> t >> empty >> join
+            t = some_task()
+            empty = EmptyOperator(task_id=f"follow_{option}")
 
-    # Example the same with external Python calls
+            # Label is optional here, but it can help identify more complex 
branches
+            random_choice_instance >> Label(option) >> t >> empty >> join
 
-    # [START howto_operator_branch_ext_py]
-    @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
-    def branching_ext_python(choices) -> str:
-        import random
+        # Example the same with external Python calls
 
-        return f"ext_py_{random.choice(choices)}"
+        # [START howto_operator_branch_ext_py]
+        @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
+        def branching_ext_python(choices) -> str:
+            import random
 
-    # [END howto_operator_branch_ext_py]
+            return f"ext_py_{random.choice(choices)}"
 
-    random_choice_ext_py = branching_ext_python(choices=options)
+        # [END howto_operator_branch_ext_py]
 
-    join >> random_choice_ext_py
+        random_choice_ext_py = branching_ext_python(choices=options)
 
-    join_ext_py = EmptyOperator(task_id="join_ext_py", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+        join >> random_choice_ext_py
 
-    for option in options:
+        join_ext_py = EmptyOperator(
+            task_id="join_ext_py", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
+        )
 
-        @task.external_python(task_id=f"ext_py_{option}", 
python=PATH_TO_PYTHON_BINARY)
-        def some_ext_py_task():
-            print("doing something in external Python")
+        for option in options:
 
-        t = some_ext_py_task()
+            @task.external_python(task_id=f"ext_py_{option}", 
python=PATH_TO_PYTHON_BINARY)
+            def some_ext_py_task():
+                print("doing something in external Python")
 
-        # Label is optional here, but it can help identify more complex 
branches
-        random_choice_ext_py >> Label(option) >> t >> join_ext_py
+            t = some_ext_py_task()
 
-    # Example the same with Python virtual environments
+            # Label is optional here, but it can help identify more complex 
branches
+            random_choice_ext_py >> Label(option) >> t >> join_ext_py
 
-    # [START howto_operator_branch_virtualenv]
-    # Note: Passing a caching dir allows to keep the virtual environment over 
multiple runs
-    #       Run the example a second time and see that it re-uses it and is 
faster.
-    VENV_CACHE_PATH = tempfile.gettempdir()
+        # Example the same with Python virtual environments
 
-    @task.branch_virtualenv(requirements=["numpy~=1.24.4"], 
venv_cache_path=VENV_CACHE_PATH)
-    def branching_virtualenv(choices) -> str:
-        import random
+        # [START howto_operator_branch_virtualenv]
+        # Note: Passing a caching dir allows to keep the virtual environment 
over multiple runs
+        #       Run the example a second time and see that it re-uses it and 
is faster.
+        VENV_CACHE_PATH = tempfile.gettempdir()
 
-        import numpy as np
+        @task.branch_virtualenv(requirements=["numpy~=1.24.4"], 
venv_cache_path=VENV_CACHE_PATH)
+        def branching_virtualenv(choices) -> str:
+            import random
 
-        print(f"Some numpy stuff: {np.arange(6)}")
-        return f"venv_{random.choice(choices)}"
+            import numpy as np
 
-    # [END howto_operator_branch_virtualenv]
+            print(f"Some numpy stuff: {np.arange(6)}")
+            return f"venv_{random.choice(choices)}"
 
-    random_choice_venv = branching_virtualenv(choices=options)
+        # [END howto_operator_branch_virtualenv]
 
-    join_ext_py >> random_choice_venv
+        random_choice_venv = branching_virtualenv(choices=options)
 
-    join_venv = EmptyOperator(task_id="join_venv", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+        join_ext_py >> random_choice_venv
 
-    for option in options:
+        join_venv = EmptyOperator(task_id="join_venv", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
 
-        @task.virtualenv(
-            task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], 
venv_cache_path=VENV_CACHE_PATH
-        )
-        def some_venv_task():
-            import numpy as np
+        for option in options:
 
-            print(f"Some numpy stuff: {np.arange(6)}")
+            @task.virtualenv(
+                task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], 
venv_cache_path=VENV_CACHE_PATH
+            )
+            def some_venv_task():
+                import numpy as np
+
+                print(f"Some numpy stuff: {np.arange(6)}")
 
-        t = some_venv_task()
+            t = some_venv_task()
 
-        # Label is optional here, but it can help identify more complex 
branches
-        random_choice_venv >> Label(option) >> t >> join_venv
+            # Label is optional here, but it can help identify more complex 
branches
+            random_choice_venv >> Label(option) >> t >> join_venv

Reply via email to