This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9de80ab8f76 AIP-72: Add logging for exception in Task Runner (#45502)
9de80ab8f76 is described below

commit 9de80ab8f76798e0d3daedf6565832bc36650961
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 9 13:28:36 2025 +0530

    AIP-72: Add logging for exception in Task Runner (#45502)
---
 task_sdk/src/airflow/sdk/execution_time/task_runner.py | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
index fd6155d91f8..1c7c96c3d5f 100644
--- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -448,6 +448,8 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         #   etc
         msg = TaskState(state=TerminalTIState.SUCCESS, 
end_date=datetime.now(tz=timezone.utc))
     except TaskDeferred as defer:
+        # TODO: Should we use structlog.bind_contextvars here for dag_id, 
task_id & run_id?
+        log.info("Pausing task as DEFERRED. ", dag_id=ti.dag_id, 
task_id=ti.task_id, run_id=ti.run_id)
         classpath, trigger_kwargs = defer.trigger.serialize()
         next_method = defer.method_name
         defer_timeout = defer.timeout
@@ -457,19 +459,22 @@ def run(ti: RuntimeTaskInstance, log: Logger):
             next_method=next_method,
             trigger_timeout=defer_timeout,
         )
-    except AirflowSkipException:
+    except AirflowSkipException as e:
+        if e.args:
+            log.info("Skipping task.", reason=e.args[0])
         msg = TaskState(
             state=TerminalTIState.SKIPPED,
             end_date=datetime.now(tz=timezone.utc),
         )
     except AirflowRescheduleException as reschedule:
+        log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
         msg = RescheduleTask(
             reschedule_date=reschedule.reschedule_date, 
end_date=datetime.now(tz=timezone.utc)
         )
     except (AirflowFailException, AirflowSensorTimeout):
         # If AirflowFailException is raised, task should not retry.
         # If a sensor in reschedule mode reaches timeout, task should not 
retry.
-
+        log.exception("Task failed with exception")
         # TODO: Handle fail_stop here: 
https://github.com/apache/airflow/issues/44951
         # TODO: Handle addition to Log table: 
https://github.com/apache/airflow/issues/44952
         msg = TaskState(
@@ -479,6 +484,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # TODO: Run task failure callbacks here
     except (AirflowTaskTimeout, AirflowException):
         # We should allow retries if the task has defined it.
+        log.exception("Task failed with exception")
         msg = TaskState(
             state=TerminalTIState.FAILED,
             end_date=datetime.now(tz=timezone.utc),
@@ -486,6 +492,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # TODO: Run task failure callbacks here
     except AirflowException:
         # TODO: handle the case of up_for_retry here
+        log.exception("Task failed with exception")
         msg = TaskState(
             state=TerminalTIState.FAILED,
             end_date=datetime.now(tz=timezone.utc),
@@ -494,6 +501,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # External state updates are already handled with `ti_heartbeat` and 
will be
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
+        log.exception("Task failed with exception")
         msg = TaskState(
             state=TerminalTIState.FAIL_WITHOUT_RETRY,
             end_date=datetime.now(tz=timezone.utc),
@@ -501,12 +509,14 @@ def run(ti: RuntimeTaskInstance, log: Logger):
         # TODO: Run task failure callbacks here
     except SystemExit:
         # SystemExit needs to be retried if they are eligible.
+        log.exception("Task failed with exception")
         msg = TaskState(
             state=TerminalTIState.FAILED,
             end_date=datetime.now(tz=timezone.utc),
         )
         # TODO: Run task failure callbacks here
     except BaseException:
+        log.exception("Task failed with exception")
         # TODO: Run task failure callbacks here
         msg = TaskState(state=TerminalTIState.FAILED, 
end_date=datetime.now(tz=timezone.utc))
     if msg:

Reply via email to