ashb commented on a change in pull request #16030:
URL: https://github.com/apache/airflow/pull/16030#discussion_r643388171
##########
File path: airflow/cli/commands/task_command.py
##########
@@ -321,38 +340,39 @@ def _guess_debugger():
@cli_utils.action_logging
@suppress_logs_and_warning
-def task_states_for_dag_run(args):
+@provide_session
+def task_states_for_dag_run(args, session=None):
"""Get the status of all task instances in a DagRun"""
- with create_session() as session:
- tis = (
- session.query(
- TaskInstance.dag_id,
- TaskInstance.execution_date,
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.start_date,
- TaskInstance.end_date,
- )
- .filter(TaskInstance.dag_id == args.dag_id,
TaskInstance.execution_date == args.execution_date)
- .all()
+ try:
+ execution_date = timezone.parse(args.execution_date_or_run_id)
+ dag_run = (
+ session.query(DagRun)
+ .filter(DagRun.execution_date == execution_date, DagRun.dag_id ==
args.dag_id)
+ .one_or_none()
)
-
- if len(tis) == 0:
- raise AirflowException("DagRun does not exist.")
-
- AirflowConsole().print_as(
- data=tis,
- output=args.output,
- mapper=lambda ti: {
- "dag_id": ti.dag_id,
- "execution_date": ti.execution_date.isoformat(),
- "task_id": ti.task_id,
- "state": ti.state,
- "start_date": ti.start_date.isoformat() if ti.start_date else
"",
- "end_date": ti.end_date.isoformat() if ti.end_date else "",
- },
+ except (ParserError, TypeError):
+ dag_run = (
+ session.query(DagRun)
+ .filter(DagRun.run_id == args.execution_date_or_run_id,
DagRun.dag_id == args.dag_id)
+ .one_or_none()
Review comment:
So long as the order is consistent (and the "right" way going forward
should be run_id first, then fallback) then the lookup order is at least
consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]