This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c0e30cb3f8 Set parsing context dag_id in dag test command (#37606)
c0e30cb3f8 is described below
commit c0e30cb3f801d3873bd92165543c534715da3b22
Author: Sam Wheating <[email protected]>
AuthorDate: Wed Feb 21 16:29:43 2024 -0800
Set parsing context dag_id in dag test command (#37606)
---
airflow/cli/commands/dag_command.py | 4 +++-
tests/cli/commands/test_dag_command.py | 13 ++++++++++++
tests/dags/test_dag_parsing_context.py | 38 ++++++++++++++++++++++++++++++++++
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 52e4fac30e..50e145eda5 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -42,6 +42,7 @@ from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import get_dag, get_dags, process_subdir,
sigint_handler, suppress_logs_and_warning
+from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session
@@ -564,7 +565,8 @@ def dag_test(args, dag: DAG | None = None, session: Session
= NEW_SESSION) -> No
except ValueError as e:
raise SystemExit(f"Configuration {args.conf!r} is not valid JSON.
Error: {e}")
execution_date = args.execution_date or timezone.utcnow()
- dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
+ with _airflow_parsing_context_manager(dag_id=args.dag_id):
+ dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf,
session=session)
show_dagrun = args.show_dagrun
imgcat = args.imgcat_dagrun
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index ca47309721..36a0d178a0 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -886,6 +886,19 @@ class TestCliDags:
dag_command.dag_test(cli_args)
assert "data_interval" in mock__get_or_create_dagrun.call_args.kwargs
+ @mock.patch("airflow.models.dag._get_or_create_dagrun")
+ def test_dag_with_parsing_context(self, mock__get_or_create_dagrun):
+ """
+ airflow parsing context should be set when calling `dags test`.
+ """
+ cli_args = self.parser.parse_args(
+ ["dags", "test", "test_dag_parsing_context",
DEFAULT_DATE.isoformat()]
+ )
+ dag_command.dag_test(cli_args)
+
+ # if dag_parsing_context is not set, this DAG will only have 1 task
+ assert len(mock__get_or_create_dagrun.call_args[1]["dag"].task_ids) ==
2
+
def test_dag_test_run_inline_trigger(self, dag_maker):
now = timezone.utcnow()
trigger = DateTimeTrigger(moment=now)
diff --git a/tests/dags/test_dag_parsing_context.py
b/tests/dags/test_dag_parsing_context.py
new file mode 100644
index 0000000000..3a72cbbc19
--- /dev/null
+++ b/tests/dags/test_dag_parsing_context.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.dag_parsing_context import get_parsing_context
+
+DAG_ID = "test_dag_parsing_context"
+
+current_dag_id = get_parsing_context().dag_id
+
+with DAG(
+ DAG_ID,
+ start_date=datetime(2024, 2, 21),
+ schedule="0 0 * * *",
+) as the_dag:
+ EmptyOperator(task_id="visible_task")
+
+ if current_dag_id == DAG_ID:
+ # this task will be invisible if the DAG ID is not properly set in the
parsing context.
+ EmptyOperator(task_id="conditional_task")