Copilot commented on code in PR #64485:
URL: https://github.com/apache/airflow/pull/64485#discussion_r3025333352


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
                 output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
             except Exception as e:
                 log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+                safe_exc = Exception(f"{type(e).__name__}: 
{str(e)}\n{traceback.format_exc()}")
+                output.put((workload.ti.key, TaskInstanceState.FAILED, 
safe_exc))

Review Comment:
   This change introduces new behavior for how worker exceptions are sent 
across processes, but there’s no regression test asserting that a non-picklable 
exception (or a simulated pickling failure) doesn’t crash LocalExecutor and 
results in a FAILED event. Adding a unit test under 
`airflow-core/tests/unit/executors/test_local_executor.py` that triggers an 
exception with an unpicklable attribute (or uses a custom Exception whose 
`__reduce__` raises) would prevent regressions.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
                 output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
             except Exception as e:
                 log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+                safe_exc = Exception(f"{type(e).__name__}: 
{str(e)}\n{traceback.format_exc()}")
+                output.put((workload.ti.key, TaskInstanceState.FAILED, 
safe_exc))

Review Comment:
   Wrapping every failure as a plain `Exception` drops the original exception 
type and attributes (and breaks any downstream logic that might check for 
specific exception subclasses). If the goal is picklability, consider passing a 
string (or a small dataclass/tuple of strings) instead, or using a 
pickling-safe wrapper that preserves `type(e).__name__` separately from the 
message/traceback.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -30,6 +30,7 @@
 import multiprocessing.sharedctypes
 import os
 import sys
+import traceback
 from multiprocessing import Queue, SimpleQueue

Review Comment:
   `traceback` is only used just to build `safe_exc` in duplicated code paths. 
Consider extracting a small helper (e.g., `_safe_exception_for_ipc(e)`) to 
avoid duplication and keep formatting consistent between task/callback 
workloads.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -115,7 +117,8 @@ def _run_worker(
                 output.put((workload.callback.id, CallbackState.SUCCESS, None))
             except Exception as e:
                 log.exception("Callback execution failed")
-                output.put((workload.callback.id, CallbackState.FAILED, e))
+                safe_exc = Exception(f"{type(e).__name__}: 
{str(e)}\n{traceback.format_exc()}")
+                output.put((workload.callback.id, CallbackState.FAILED, 
safe_exc))

Review Comment:
   Same as for task failures: the callback failure `safe_exc` is queued but 
`_read_results()` ignores the third tuple element, so `SchedulerJobRunner` 
won’t receive any details and will store only a generic failure message for 
callbacks. Ensure the `exc/info` is forwarded via `change_state(..., info=exc)` 
(or equivalent) so callback.output and task mismatch logs can include it.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
                 output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
             except Exception as e:
                 log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+                safe_exc = Exception(f"{type(e).__name__}: 
{str(e)}\n{traceback.format_exc()}")
+                output.put((workload.ti.key, TaskInstanceState.FAILED, 
safe_exc))

Review Comment:
   The queued `safe_exc` isn’t currently propagated anywhere: `_read_results()` 
reads `(key, state, exc)` but calls `change_state(key, state)` without passing 
`exc`, so the traceback/message won’t reach the scheduler/event_buffer (and 
callback failure output will still end up as the generic "Execution failed"). 
Either pass the exception/info through to `change_state(..., info=...)` in 
`_read_results()`, or simplify the queue payload to avoid building 
`traceback.format_exc()` that will be discarded.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
                 output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
             except Exception as e:
                 log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+                safe_exc = Exception(f"{type(e).__name__}: 
{str(e)}\n{traceback.format_exc()}")
+                output.put((workload.ti.key, TaskInstanceState.FAILED, 
safe_exc))

Review Comment:
   `traceback.format_exc()` can generate very large strings; sending this 
through a multiprocessing queue for every failure can significantly increase 
CPU/memory and IPC overhead. Consider queueing a smaller, structured payload 
(e.g., exception type + message, and optionally a truncated traceback), or only 
including the full traceback when debug logging is enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to