jedcunningham commented on code in PR #37262:
URL: https://github.com/apache/airflow/pull/37262#discussion_r1486529995
##########
airflow/www/views.py:
##########
@@ -3402,6 +3405,123 @@ def graph_data(self, session: Session = NEW_SESSION):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_attributes")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @gzipped
+ @action_logging
+ @provide_session
+ def task_instance_attributes(self, session: Session = NEW_SESSION):
+ """Retrieve task instance attributes."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+ dag = get_airflow_app().dag_bag.get_dag(dag_id, session)
+
+ if not dag or task_id not in dag.task_ids:
+ # flash(
+ # f"Task [{dag_id}.{task_id}] doesn't seem to exist at the
moment", "error")
+ return {}
+ task = copy.copy(dag.get_task(task_id))
+ task.resolve_template_files()
+
+ ti: TaskInstance | None = session.scalar(
+ select(TaskInstance)
+ .options(
+ # HACK: Eager-load relationships. This is needed because
+ # multiple properties mis-use provide_session() that destroys
+ # the session object ti is bounded to.
+ joinedload(TaskInstance.queued_by_job, innerjoin=False),
+ joinedload(TaskInstance.trigger, innerjoin=False),
+ )
+ .filter_by(run_id=run_id, dag_id=dag_id, task_id=task_id)
+ )
+ if ti is None:
+ ti_attrs: list[tuple[str, Any]] | None = None
+ else:
+ ti.refresh_from_task(task)
+ ti_attrs_to_skip = [
+ "dag_id",
+ "dag_model",
+ "dag_run",
+ "run_id",
+ "task_id",
+ "task_instance_notedag_run",
Review Comment:
```suggestion
"dag_run",
```
I assume we want the ti note.
##########
airflow/www/views.py:
##########
@@ -3402,6 +3405,123 @@ def graph_data(self, session: Session = NEW_SESSION):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_attributes")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @gzipped
+ @action_logging
+ @provide_session
+ def task_instance_attributes(self, session: Session = NEW_SESSION):
+ """Retrieve task instance attributes."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+ dag = get_airflow_app().dag_bag.get_dag(dag_id, session)
+
+ if not dag or task_id not in dag.task_ids:
+ # flash(
+ # f"Task [{dag_id}.{task_id}] doesn't seem to exist at the
moment", "error")
+ return {}
+ task = copy.copy(dag.get_task(task_id))
+ task.resolve_template_files()
+
+ ti: TaskInstance | None = session.scalar(
+ select(TaskInstance)
+ .options(
+ # HACK: Eager-load relationships. This is needed because
+ # multiple properties mis-use provide_session() that destroys
+ # the session object ti is bounded to.
+ joinedload(TaskInstance.queued_by_job, innerjoin=False),
+ joinedload(TaskInstance.trigger, innerjoin=False),
+ )
+ .filter_by(run_id=run_id, dag_id=dag_id, task_id=task_id)
+ )
+ if ti is None:
+ ti_attrs: list[tuple[str, Any]] | None = None
+ else:
+ ti.refresh_from_task(task)
+ ti_attrs_to_skip = [
+ "dag_id",
+ "dag_model",
+ "dag_run",
+ "run_id",
+ "task_id",
+ "task_instance_notedag_run",
+ "duration",
+ "start_date",
+ "state",
+ "end_date",
+ "map_index",
+ "execution_date",
+ "operator",
+ "operator_name",
+ "key",
+ "mark_success_url",
+ "log",
+ "log_url",
+ "task",
+ "trigger",
+ "triggerer_job",
+ "try_number",
+ "previous_ti",
+ "previous_ti_success",
+ "previous_start_date_success",
+ "queued_dttm",
+ ]
+ # Some fields on TI are deprecated, but we don't want those
warnings here.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", RemovedInAirflow3Warning)
+ all_ti_attrs = (
+ # fetching the value of _try_number to be shown under name
try_number in UI
+ (name, getattr(ti, "_try_number" if name == "try_number"
else name))
+ for name in dir(ti)
+ if not name.startswith("_") and name not in
ti_attrs_to_skip
+ )
+ ti_attrs = sorted((name, attr) for name, attr in all_ti_attrs if
not callable(attr))
Review Comment:
You could check callable in the loop avoid to avoid looping again.
##########
airflow/www/views.py:
##########
@@ -3402,6 +3405,123 @@ def graph_data(self, session: Session = NEW_SESSION):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_attributes")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @gzipped
+ @action_logging
+ @provide_session
+ def task_instance_attributes(self, session: Session = NEW_SESSION):
+ """Retrieve task instance attributes."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
Review Comment:
missing map_index
##########
airflow/www/views.py:
##########
@@ -3402,6 +3405,123 @@ def graph_data(self, session: Session = NEW_SESSION):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_attributes")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @gzipped
+ @action_logging
+ @provide_session
+ def task_instance_attributes(self, session: Session = NEW_SESSION):
+ """Retrieve task instance attributes."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+ dag = get_airflow_app().dag_bag.get_dag(dag_id, session)
+
+ if not dag or task_id not in dag.task_ids:
+ # flash(
+ # f"Task [{dag_id}.{task_id}] doesn't seem to exist at the
moment", "error")
+ return {}
+ task = copy.copy(dag.get_task(task_id))
+ task.resolve_template_files()
+
+ ti: TaskInstance | None = session.scalar(
+ select(TaskInstance)
+ .options(
+ # HACK: Eager-load relationships. This is needed because
+ # multiple properties mis-use provide_session() that destroys
+ # the session object ti is bounded to.
+ joinedload(TaskInstance.queued_by_job, innerjoin=False),
+ joinedload(TaskInstance.trigger, innerjoin=False),
+ )
+ .filter_by(run_id=run_id, dag_id=dag_id, task_id=task_id)
+ )
+ if ti is None:
+ ti_attrs: list[tuple[str, Any]] | None = None
+ else:
+ ti.refresh_from_task(task)
+ ti_attrs_to_skip = [
+ "dag_id",
+ "dag_model",
+ "dag_run",
+ "run_id",
+ "task_id",
+ "task_instance_notedag_run",
+ "duration",
+ "start_date",
Review Comment:
Why don't we want these? Since we already have them?
##########
airflow/www/views.py:
##########
@@ -3402,6 +3405,123 @@ def graph_data(self, session: Session = NEW_SESSION):
{"Content-Type": "application/json; charset=utf-8"},
)
+ @expose("/object/task_instance_attributes")
+ @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+ @gzipped
+ @action_logging
+ @provide_session
+ def task_instance_attributes(self, session: Session = NEW_SESSION):
+ """Retrieve task instance attributes."""
+ dag_id = request.args.get("dag_id")
+ task_id = request.args.get("task_id")
+ run_id = request.args.get("run_id")
+ dag = get_airflow_app().dag_bag.get_dag(dag_id, session)
+
+ if not dag or task_id not in dag.task_ids:
+ # flash(
+ # f"Task [{dag_id}.{task_id}] doesn't seem to exist at the
moment", "error")
+ return {}
+ task = copy.copy(dag.get_task(task_id))
+ task.resolve_template_files()
+
+ ti: TaskInstance | None = session.scalar(
+ select(TaskInstance)
+ .options(
+ # HACK: Eager-load relationships. This is needed because
+ # multiple properties mis-use provide_session() that destroys
+ # the session object ti is bounded to.
+ joinedload(TaskInstance.queued_by_job, innerjoin=False),
+ joinedload(TaskInstance.trigger, innerjoin=False),
+ )
+ .filter_by(run_id=run_id, dag_id=dag_id, task_id=task_id)
+ )
+ if ti is None:
+ ti_attrs: list[tuple[str, Any]] | None = None
+ else:
+ ti.refresh_from_task(task)
+ ti_attrs_to_skip = [
+ "dag_id",
+ "dag_model",
+ "dag_run",
+ "run_id",
+ "task_id",
+ "task_instance_notedag_run",
+ "duration",
+ "start_date",
+ "state",
+ "end_date",
+ "map_index",
+ "execution_date",
+ "operator",
+ "operator_name",
+ "key",
+ "mark_success_url",
+ "log",
+ "log_url",
+ "task",
+ "trigger",
+ "triggerer_job",
+ "try_number",
+ "previous_ti",
+ "previous_ti_success",
+ "previous_start_date_success",
+ "queued_dttm",
+ ]
+ # Some fields on TI are deprecated, but we don't want those
warnings here.
Review Comment:
We probably shouldn't include deprecated fields?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]