This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 00922c0bf1d2a7ad9c1b4050bb02d3b2d0c487a6 Author: Sam Wheating <[email protected]> AuthorDate: Wed Feb 21 16:29:43 2024 -0800 Set parsing context dag_id in dag test command (#37606) (cherry picked from commit c0e30cb3f801d3873bd92165543c534715da3b22) --- airflow/cli/commands/dag_command.py | 4 +++- tests/cli/commands/test_dag_command.py | 13 ++++++++++++ tests/dags/test_dag_parsing_context.py | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index a9d6aaf342..735d66a8a8 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -42,6 +42,7 @@ from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning +from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager from airflow.utils.dot_renderer import render_dag, render_dag_dependencies from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, create_session, provide_session @@ -514,7 +515,8 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No except ValueError as e: raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") execution_date = args.execution_date or timezone.utcnow() - dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id) + with _airflow_parsing_context_manager(dag_id=args.dag_id): + dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id) dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf, session=session) show_dagrun = args.show_dagrun imgcat = args.imgcat_dagrun diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 1c1a7ff650..a209fd33a3 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -837,6 +837,19 @@ class TestCliDags: dag_command.dag_test(cli_args) assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs + @mock.patch("airflow.models.dag._get_or_create_dagrun") + def test_dag_with_parsing_context(self, mock__get_or_create_dagrun): + """ + airflow parsing context should be set when calling `dags test`. + """ + cli_args = self.parser.parse_args( + ["dags", "test", "test_dag_parsing_context", DEFAULT_DATE.isoformat()] + ) + dag_command.dag_test(cli_args) + + # if dag_parsing_context is not set, this DAG will only have 1 task + assert len(mock__get_or_create_dagrun.call_args[1]["dag"].task_ids) == 2 + def test_dag_test_run_trigger(self, dag_maker): now = timezone.utcnow() trigger = DateTimeTrigger(moment=now) diff --git a/tests/dags/test_dag_parsing_context.py b/tests/dags/test_dag_parsing_context.py new file mode 100644 index 0000000000..3a72cbbc19 --- /dev/null +++ b/tests/dags/test_dag_parsing_context.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils.dag_parsing_context import get_parsing_context + +DAG_ID = "test_dag_parsing_context" + +current_dag_id = get_parsing_context().dag_id + +with DAG( + DAG_ID, + start_date=datetime(2024, 2, 21), + schedule="0 0 * * *", +) as the_dag: + EmptyOperator(task_id="visible_task") + + if current_dag_id == DAG_ID: + # this task will be invisible if the DAG ID is not properly set in the parsing context. + EmptyOperator(task_id="conditional_task")
