This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bd094b2ef57b6ae526599e2df52de627766f7744 Author: Karthikeyan Singaravelan <[email protected]> AuthorDate: Fri Mar 17 00:38:17 2023 +0530 Fix warning in airflow tasks test command regarding absence of data_interval (#27106) * Fix warning in airflow tasks test command regarding absence of data_interval. * Use manual data interval in task commands * Update tests/cli/commands/test_task_command.py --------- Co-authored-by: Tzu-ping Chung <[email protected]> Co-authored-by: Jarek Potiuk <[email protected]> (cherry picked from commit 70680ded7a4056882008b019f5d1a8f559a301cd) --- airflow/cli/commands/task_command.py | 7 ++++--- tests/cli/commands/test_task_command.py | 24 ++++++++++++++++++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index b514754f65..940f448c21 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -18,7 +18,6 @@ """Task sub-commands.""" from __future__ import annotations -import datetime import importlib import json import logging @@ -27,6 +26,7 @@ import textwrap from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress from typing import Generator, Union +import pendulum from pendulum.parsing.exceptions import ParserError from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm.session import Session @@ -97,7 +97,7 @@ def _get_dag_run( """ if not exec_date_or_run_id and not create_if_necessary: raise ValueError("Must provide `exec_date_or_run_id` if not `create_if_necessary`.") - execution_date: datetime.datetime | None = None + execution_date: pendulum.DateTime | None = None if exec_date_or_run_id: dag_run = dag.get_dagrun(run_id=exec_date_or_run_id, session=session) if dag_run: @@ -122,7 +122,7 @@ def _get_dag_run( if execution_date is not None: dag_run_execution_date = execution_date else: - dag_run_execution_date = timezone.utcnow() + dag_run_execution_date = pendulum.instance(timezone.utcnow()) if create_if_necessary == "memory": dag_run = DagRun(dag.dag_id, run_id=exec_date_or_run_id, execution_date=dag_run_execution_date) @@ -132,6 +132,7 @@ def _get_dag_run( state=DagRunState.QUEUED, execution_date=dag_run_execution_date, run_id=_generate_temporary_run_id(), + data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date), session=session, ) return dag_run, True diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 864ea10408..6a1c4afdf0 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -87,8 +87,13 @@ class TestCliTasks: cls.dag = cls.dagbag.get_dag(cls.dag_id) cls.dagbag.sync_to_db() + data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE) cls.dag_run = cls.dag.create_dagrun( - state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE + state=State.NONE, + run_id=cls.run_id, + run_type=DagRunType.MANUAL, + execution_date=DEFAULT_DATE, + data_interval=data_interval, ) @classmethod @@ -177,11 +182,14 @@ class TestCliTasks: dag = dagbag.get_dag("test_dags_folder") dagbag.sync_to_db(session=session) + execution_date = pendulum.now("UTC") + data_interval = dag.timetable.infer_manual_data_interval(run_after=execution_date) dag.create_dagrun( state=State.NONE, run_id="abc123", run_type=DagRunType.MANUAL, - execution_date=pendulum.now("UTC"), + execution_date=execution_date, + data_interval=data_interval, session=session, ) session.commit() @@ -489,9 +497,11 @@ class TestCliTasks: task2 = dag2.get_task(task_id="print_the_context") default_date2 = timezone.datetime(2016, 1, 9) dag2.clear() + data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2) dagrun = dag2.create_dagrun( state=State.RUNNING, execution_date=default_date2, + data_interval=data_interval, run_type=DagRunType.MANUAL, external_trigger=True, ) @@ -576,9 +586,12 @@ class TestLogsfromTaskRunCommand: self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename) self.parser = cli_parser.get_parser() - DagBag().get_dag(self.dag_id).create_dagrun( + dag = DagBag().get_dag(self.dag_id) + data_interval = dag.timetable.infer_manual_data_interval(run_after=self.execution_date) + dag.create_dagrun( run_id=self.run_id, execution_date=self.execution_date, + data_interval=data_interval, start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL, @@ -815,9 +828,12 @@ def test_context_with_run(): task_args = ["tasks", "run", dag_id, task_id, "--local", execution_date_str] parser = cli_parser.get_parser() - DagBag().get_dag(dag_id).create_dagrun( + dag = DagBag().get_dag(dag_id) + data_interval = dag.timetable.infer_manual_data_interval(run_after=execution_date) + dag.create_dagrun( run_id=run_id, execution_date=execution_date, + data_interval=data_interval, start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL,
