This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bd094b2ef57b6ae526599e2df52de627766f7744
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Fri Mar 17 00:38:17 2023 +0530

    Fix warning in airflow tasks test command regarding absence of 
data_interval (#27106)
    
    * Fix warning in airflow tasks test command regarding absence of 
data_interval.
    
    * Use manual data interval in task commands
    
    * Update tests/cli/commands/test_task_command.py
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
    (cherry picked from commit 70680ded7a4056882008b019f5d1a8f559a301cd)
---
 airflow/cli/commands/task_command.py    |  7 ++++---
 tests/cli/commands/test_task_command.py | 24 ++++++++++++++++++++----
 2 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/airflow/cli/commands/task_command.py 
b/airflow/cli/commands/task_command.py
index b514754f65..940f448c21 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -18,7 +18,6 @@
 """Task sub-commands."""
 from __future__ import annotations
 
-import datetime
 import importlib
 import json
 import logging
@@ -27,6 +26,7 @@ import textwrap
 from contextlib import contextmanager, redirect_stderr, redirect_stdout, 
suppress
 from typing import Generator, Union
 
+import pendulum
 from pendulum.parsing.exceptions import ParserError
 from sqlalchemy.orm.exc import NoResultFound
 from sqlalchemy.orm.session import Session
@@ -97,7 +97,7 @@ def _get_dag_run(
     """
     if not exec_date_or_run_id and not create_if_necessary:
         raise ValueError("Must provide `exec_date_or_run_id` if not 
`create_if_necessary`.")
-    execution_date: datetime.datetime | None = None
+    execution_date: pendulum.DateTime | None = None
     if exec_date_or_run_id:
         dag_run = dag.get_dagrun(run_id=exec_date_or_run_id, session=session)
         if dag_run:
@@ -122,7 +122,7 @@ def _get_dag_run(
     if execution_date is not None:
         dag_run_execution_date = execution_date
     else:
-        dag_run_execution_date = timezone.utcnow()
+        dag_run_execution_date = pendulum.instance(timezone.utcnow())
 
     if create_if_necessary == "memory":
         dag_run = DagRun(dag.dag_id, run_id=exec_date_or_run_id, 
execution_date=dag_run_execution_date)
@@ -132,6 +132,7 @@ def _get_dag_run(
             state=DagRunState.QUEUED,
             execution_date=dag_run_execution_date,
             run_id=_generate_temporary_run_id(),
+            
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_execution_date),
             session=session,
         )
         return dag_run, True
diff --git a/tests/cli/commands/test_task_command.py 
b/tests/cli/commands/test_task_command.py
index 864ea10408..6a1c4afdf0 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -87,8 +87,13 @@ class TestCliTasks:
 
         cls.dag = cls.dagbag.get_dag(cls.dag_id)
         cls.dagbag.sync_to_db()
+        data_interval = 
cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
         cls.dag_run = cls.dag.create_dagrun(
-            state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL, 
execution_date=DEFAULT_DATE
+            state=State.NONE,
+            run_id=cls.run_id,
+            run_type=DagRunType.MANUAL,
+            execution_date=DEFAULT_DATE,
+            data_interval=data_interval,
         )
 
     @classmethod
@@ -177,11 +182,14 @@ class TestCliTasks:
             dag = dagbag.get_dag("test_dags_folder")
             dagbag.sync_to_db(session=session)
 
+        execution_date = pendulum.now("UTC")
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=execution_date)
         dag.create_dagrun(
             state=State.NONE,
             run_id="abc123",
             run_type=DagRunType.MANUAL,
-            execution_date=pendulum.now("UTC"),
+            execution_date=execution_date,
+            data_interval=data_interval,
             session=session,
         )
         session.commit()
@@ -489,9 +497,11 @@ class TestCliTasks:
         task2 = dag2.get_task(task_id="print_the_context")
         default_date2 = timezone.datetime(2016, 1, 9)
         dag2.clear()
+        data_interval = 
dag2.timetable.infer_manual_data_interval(run_after=default_date2)
         dagrun = dag2.create_dagrun(
             state=State.RUNNING,
             execution_date=default_date2,
+            data_interval=data_interval,
             run_type=DagRunType.MANUAL,
             external_trigger=True,
         )
@@ -576,9 +586,12 @@ class TestLogsfromTaskRunCommand:
         self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
         self.parser = cli_parser.get_parser()
 
-        DagBag().get_dag(self.dag_id).create_dagrun(
+        dag = DagBag().get_dag(self.dag_id)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=self.execution_date)
+        dag.create_dagrun(
             run_id=self.run_id,
             execution_date=self.execution_date,
+            data_interval=data_interval,
             start_date=timezone.utcnow(),
             state=State.RUNNING,
             run_type=DagRunType.MANUAL,
@@ -815,9 +828,12 @@ def test_context_with_run():
     task_args = ["tasks", "run", dag_id, task_id, "--local", 
execution_date_str]
     parser = cli_parser.get_parser()
 
-    DagBag().get_dag(dag_id).create_dagrun(
+    dag = DagBag().get_dag(dag_id)
+    data_interval = 
dag.timetable.infer_manual_data_interval(run_after=execution_date)
+    dag.create_dagrun(
         run_id=run_id,
         execution_date=execution_date,
+        data_interval=data_interval,
         start_date=timezone.utcnow(),
         state=State.RUNNING,
         run_type=DagRunType.MANUAL,

Reply via email to