Lee-W commented on code in PR #46407:
URL: https://github.com/apache/airflow/pull/46407#discussion_r1944150253


##########
airflow/cli/commands/remote_commands/dag_command.py:
##########
@@ -264,12 +265,16 @@ def dag_state(args, session: Session = NEW_SESSION) -> 
None:
 
     if not dag:
         raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
-    dr = session.scalar(select(DagRun).filter_by(dag_id=args.dag_id, 
logical_date=args.logical_date))
-    out = dr.state if dr else None
-    conf_out = ""
-    if out and dr.conf:
+    dr, _ = fetch_dag_run_from_run_id_or_logical_date_string(
+        dag_id=dag.dag_id,
+        value=args.logical_date_or_run_id,
+        session=session,
+    )
+    if dr and dr.conf:
         conf_out = ", " + json.dumps(dr.conf)

Review Comment:
   ```suggestion
           conf_out = f", {json.dumps(dr.conf)}"
   ```



##########
airflow/cli/utils.py:
##########
@@ -50,3 +55,45 @@ def print_export_output(command_type: str, exported_items: 
Collection, file: io.
         print(f"\n{len(exported_items)} {command_type} successfully 
exported.", file=sys.stderr)
     else:
         print(f"{len(exported_items)} {command_type} successfully exported to 
{file.name}.")
+
+
+def fetch_dag_run_from_run_id_or_logical_date_string(
+    *,
+    dag_id: str,
+    value: str,
+    session: Session,
+) -> tuple[DagRun | None, DateTime | None]:
+    """
+    Try to find a DAG run with a given string value.
+
+    The string value may be a run ID, or a logical date in string form. We 
first
+    try to use it as a run_id; if a run is found, it is returned as-is.
+
+    Otherwise, the string value is parsed into a datetime. If that works, it is
+    used to find a DAG run.
+
+    The return value is a two-tuple. The first item is the found DAG run (or
+    *None* if one cannot be found). The second is the parsed logical date. This
+    second value can be used to create a new run by the calling function when
+    one cannot be found here.
+    """
+    from pendulum.parsing.exceptions import ParserError
+    from sqlalchemy import select
+
+    from airflow.models.dag import DAG
+    from airflow.models.dagrun import DagRun
+    from airflow.utils import timezone
+
+    if dag_run := DAG.fetch_dagrun(dag_id=dag_id, run_id=value, 
session=session):
+        return dag_run, dag_run.logical_date  # type: ignore[return-value]

Review Comment:
   why do we need type ignore here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to