This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c0e30cb3f8 Set parsing context dag_id in dag test command (#37606)
c0e30cb3f8 is described below

commit c0e30cb3f801d3873bd92165543c534715da3b22
Author: Sam Wheating <[email protected]>
AuthorDate: Wed Feb 21 16:29:43 2024 -0800

    Set parsing context dag_id in dag test command (#37606)
---
 airflow/cli/commands/dag_command.py    |  4 +++-
 tests/cli/commands/test_dag_command.py | 13 ++++++++++++
 tests/dags/test_dag_parsing_context.py | 38 ++++++++++++++++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 52e4fac30e..50e145eda5 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -42,6 +42,7 @@ from airflow.models.dag import DAG
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import cli as cli_utils, timezone
 from airflow.utils.cli import get_dag, get_dags, process_subdir, 
sigint_handler, suppress_logs_and_warning
+from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
@@ -564,7 +565,8 @@ def dag_test(args, dag: DAG | None = None, session: Session 
= NEW_SESSION) -> No
         except ValueError as e:
             raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. 
Error: {e}")
     execution_date = args.execution_date or timezone.utcnow()
-    dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
+    with _airflow_parsing_context_manager(dag_id=args.dag_id):
+        dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
     dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf, 
session=session)
     show_dagrun = args.show_dagrun
     imgcat = args.imgcat_dagrun
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index ca47309721..36a0d178a0 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -886,6 +886,19 @@ class TestCliDags:
         dag_command.dag_test(cli_args)
         assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs
 
+    @mock.patch("airflow.models.dag._get_or_create_dagrun")
+    def test_dag_with_parsing_context(self, mock__get_or_create_dagrun):
+        """
+        airflow parsing context should be set when calling `dags test`.
+        """
+        cli_args = self.parser.parse_args(
+            ["dags", "test", "test_dag_parsing_context", 
DEFAULT_DATE.isoformat()]
+        )
+        dag_command.dag_test(cli_args)
+
+        # if dag_parsing_context is not set, this DAG will only have 1 task
+        assert len(mock__get_or_create_dagrun.call_args[1]["dag"].task_ids) == 
2
+
     def test_dag_test_run_inline_trigger(self, dag_maker):
         now = timezone.utcnow()
         trigger = DateTimeTrigger(moment=now)
diff --git a/tests/dags/test_dag_parsing_context.py 
b/tests/dags/test_dag_parsing_context.py
new file mode 100644
index 0000000000..3a72cbbc19
--- /dev/null
+++ b/tests/dags/test_dag_parsing_context.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.dag_parsing_context import get_parsing_context
+
+DAG_ID = "test_dag_parsing_context"
+
+current_dag_id = get_parsing_context().dag_id
+
+with DAG(
+    DAG_ID,
+    start_date=datetime(2024, 2, 21),
+    schedule="0 0 * * *",
+) as the_dag:
+    EmptyOperator(task_id="visible_task")
+
+    if current_dag_id == DAG_ID:
+        # this task will be invisible if the DAG ID is not properly set in the 
parsing context.
+        EmptyOperator(task_id="conditional_task")

Reply via email to