uranusjr commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643381620
##########
File path: tests/cli/commands/test_task_command.py
##########
@@ -62,6 +62,7 @@ class TestCliTasks(unittest.TestCase):
def setUpClass(cls):
cls.dagbag = DagBag(include_examples=True)
cls.parser = cli_parser.get_parser()
+ clear_db_runs()
Review comment:
We should also call this on `tearDown()` to keep things tidy for other
tests.
##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
get_dags,
suppress_logs_and_warning,
)
+from airflow.utils.dates import timezone
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+ """Get the task instance through DagRun.run_id, if that fails, get the TI
the old way"""
+ dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+ if not dag_run:
+ try:
+ execution_date = timezone.parse(exec_date_or_run_id)
+ # print("Warning: execution_date will be removed in
+ # tasks command in the future. Please use DagRun.run_id")
Review comment:
This this a TODO for the future?
##########
File path: airflow/cli/commands/task_command.py
##########
@@ -42,9 +45,26 @@
get_dags,
suppress_logs_and_warning,
)
+from airflow.utils.dates import timezone
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.net import get_hostname
-from airflow.utils.session import create_session
+from airflow.utils.session import create_session, provide_session
+
+
+def _get_ti(dag, task, exec_date_or_run_id):
+ """Get the task instance through DagRun.run_id, if that fails, get the TI
the old way"""
+ dag_run = dag.get_dagrun(run_id=exec_date_or_run_id)
+ if not dag_run:
+ try:
+ execution_date = timezone.parse(exec_date_or_run_id)
+ # print("Warning: execution_date will be removed in
+ # tasks command in the future. Please use DagRun.run_id")
+ ti = TaskInstance(task, execution_date)
+ return ti
+ except (ParserError, TypeError):
+ raise AirflowException(f"DagRun with run_id: {exec_date_or_run_id}
not found")
+ ti = TaskInstance(task, execution_date=dag_run.execution_date)
+ return ti
Review comment:
This one tries to use the argument as run_id first and fallback to
parsing it as an execution date otherwise, but…
##########
File path: airflow/models/dag.py
##########
@@ -905,22 +905,24 @@ def get_num_active_runs(self, external_trigger=None,
session=None):
return query.scalar()
@provide_session
- def get_dagrun(self, execution_date, session=None):
+ def get_dagrun(self, execution_date: str = None, run_id: str = None,
session=None):
Review comment:
If the argument could be None, the type should be `Optional[str]`. (In
this particular case, we can actually set the default to `""` since both the
execution date and run_id cannot be empty, but that might not be as intuitive
to read.)
##########
File path: airflow/cli/commands/task_command.py
##########
@@ -321,38 +340,39 @@ def _guess_debugger():
@cli_utils.action_logging
@suppress_logs_and_warning
-def task_states_for_dag_run(args):
+@provide_session
+def task_states_for_dag_run(args, session=None):
"""Get the status of all task instances in a DagRun"""
- with create_session() as session:
- tis = (
- session.query(
- TaskInstance.dag_id,
- TaskInstance.execution_date,
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.start_date,
- TaskInstance.end_date,
- )
- .filter(TaskInstance.dag_id == args.dag_id,
TaskInstance.execution_date == args.execution_date)
- .all()
+ try:
+ execution_date = timezone.parse(args.execution_date_or_run_id)
+ dag_run = (
+ session.query(DagRun)
+ .filter(DagRun.execution_date == execution_date, DagRun.dag_id ==
args.dag_id)
+ .one_or_none()
)
-
- if len(tis) == 0:
- raise AirflowException("DagRun does not exist.")
-
- AirflowConsole().print_as(
- data=tis,
- output=args.output,
- mapper=lambda ti: {
- "dag_id": ti.dag_id,
- "execution_date": ti.execution_date.isoformat(),
- "task_id": ti.task_id,
- "state": ti.state,
- "start_date": ti.start_date.isoformat() if ti.start_date else
"",
- "end_date": ti.end_date.isoformat() if ti.end_date else "",
- },
+ except (ParserError, TypeError):
+ dag_run = (
+ session.query(DagRun)
+ .filter(DagRun.run_id == args.execution_date_or_run_id,
DagRun.dag_id == args.dag_id)
+ .one_or_none()
Review comment:
Logic here is the other way around, trying to parse the argument as an
execution date first and look it up and a run_id later. This feels wrong to me;
we should commit to do this one way or the other, and use a common function for
it. There is also potentially an edge case if a run_id somehow can be parsed as
a datetiem string (admittedly extremely unlikely), which would cause pretty
terrible internal inconsistencies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]