This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9de80ab8f76 AIP-72: Add logging for exception in Task Runner (#45502)
9de80ab8f76 is described below
commit 9de80ab8f76798e0d3daedf6565832bc36650961
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 9 13:28:36 2025 +0530
AIP-72: Add logging for exception in Task Runner (#45502)
---
task_sdk/src/airflow/sdk/execution_time/task_runner.py | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index fd6155d91f8..1c7c96c3d5f 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -448,6 +448,8 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# etc
msg = TaskState(state=TerminalTIState.SUCCESS,
end_date=datetime.now(tz=timezone.utc))
except TaskDeferred as defer:
+ # TODO: Should we use structlog.bind_contextvars here for dag_id,
task_id & run_id?
+ log.info("Pausing task as DEFERRED. ", dag_id=ti.dag_id,
task_id=ti.task_id, run_id=ti.run_id)
classpath, trigger_kwargs = defer.trigger.serialize()
next_method = defer.method_name
defer_timeout = defer.timeout
@@ -457,19 +459,22 @@ def run(ti: RuntimeTaskInstance, log: Logger):
next_method=next_method,
trigger_timeout=defer_timeout,
)
- except AirflowSkipException:
+ except AirflowSkipException as e:
+ if e.args:
+ log.info("Skipping task.", reason=e.args[0])
msg = TaskState(
state=TerminalTIState.SKIPPED,
end_date=datetime.now(tz=timezone.utc),
)
except AirflowRescheduleException as reschedule:
+ log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
msg = RescheduleTask(
reschedule_date=reschedule.reschedule_date,
end_date=datetime.now(tz=timezone.utc)
)
except (AirflowFailException, AirflowSensorTimeout):
# If AirflowFailException is raised, task should not retry.
# If a sensor in reschedule mode reaches timeout, task should not
retry.
-
+ log.exception("Task failed with exception")
# TODO: Handle fail_stop here:
https://github.com/apache/airflow/issues/44951
# TODO: Handle addition to Log table:
https://github.com/apache/airflow/issues/44952
msg = TaskState(
@@ -479,6 +484,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# TODO: Run task failure callbacks here
except (AirflowTaskTimeout, AirflowException):
# We should allow retries if the task has defined it.
+ log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
@@ -486,6 +492,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# TODO: Run task failure callbacks here
except AirflowException:
# TODO: handle the case of up_for_retry here
+ log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
@@ -494,6 +501,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# External state updates are already handled with `ti_heartbeat` and
will be
# updated already be another UI API. So, these exceptions should
ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
+ log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAIL_WITHOUT_RETRY,
end_date=datetime.now(tz=timezone.utc),
@@ -501,12 +509,14 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# TODO: Run task failure callbacks here
except SystemExit:
# SystemExit needs to be retried if they are eligible.
+ log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
# TODO: Run task failure callbacks here
except BaseException:
+ log.exception("Task failed with exception")
# TODO: Run task failure callbacks here
msg = TaskState(state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc))
if msg: