Copilot commented on code in PR #64485:
URL: https://github.com/apache/airflow/pull/64485#discussion_r3025333352
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
except Exception as e:
log.exception("Task execution failed.")
- output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+ safe_exc = Exception(f"{type(e).__name__}:
{str(e)}\n{traceback.format_exc()}")
+ output.put((workload.ti.key, TaskInstanceState.FAILED,
safe_exc))
Review Comment:
This change introduces new behavior for how worker exceptions are sent
across processes, but there’s no regression test asserting that a non-picklable
exception (or a simulated pickling failure) doesn’t crash LocalExecutor and
results in a FAILED event. Adding a unit test under
`airflow-core/tests/unit/executors/test_local_executor.py` that triggers an
exception with an unpicklable attribute (or uses a custom Exception whose
`__reduce__` raises) would prevent regressions.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
except Exception as e:
log.exception("Task execution failed.")
- output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+ safe_exc = Exception(f"{type(e).__name__}:
{str(e)}\n{traceback.format_exc()}")
+ output.put((workload.ti.key, TaskInstanceState.FAILED,
safe_exc))
Review Comment:
Wrapping every failure as a plain `Exception` drops the original exception
type and attributes (and breaks any downstream logic that might check for
specific exception subclasses). If the goal is picklability, consider passing a
string (or a small dataclass/tuple of strings) instead, or using a
pickling-safe wrapper that preserves `type(e).__name__` separately from the
message/traceback.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -30,6 +30,7 @@
import multiprocessing.sharedctypes
import os
import sys
+import traceback
from multiprocessing import Queue, SimpleQueue
Review Comment:
`traceback` is only used just to build `safe_exc` in duplicated code paths.
Consider extracting a small helper (e.g., `_safe_exception_for_ipc(e)`) to
avoid duplication and keep formatting consistent between task/callback
workloads.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -115,7 +117,8 @@ def _run_worker(
output.put((workload.callback.id, CallbackState.SUCCESS, None))
except Exception as e:
log.exception("Callback execution failed")
- output.put((workload.callback.id, CallbackState.FAILED, e))
+ safe_exc = Exception(f"{type(e).__name__}:
{str(e)}\n{traceback.format_exc()}")
+ output.put((workload.callback.id, CallbackState.FAILED,
safe_exc))
Review Comment:
Same as for task failures: the callback failure `safe_exc` is queued but
`_read_results()` ignores the third tuple element, so `SchedulerJobRunner`
won’t receive any details and will store only a generic failure message for
callbacks. Ensure the `exc/info` is forwarded via `change_state(..., info=exc)`
(or equivalent) so callback.output and task mismatch logs can include it.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
except Exception as e:
log.exception("Task execution failed.")
- output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+ safe_exc = Exception(f"{type(e).__name__}:
{str(e)}\n{traceback.format_exc()}")
+ output.put((workload.ti.key, TaskInstanceState.FAILED,
safe_exc))
Review Comment:
The queued `safe_exc` isn’t currently propagated anywhere: `_read_results()`
reads `(key, state, exc)` but calls `change_state(key, state)` without passing
`exc`, so the traceback/message won’t reach the scheduler/event_buffer (and
callback failure output will still end up as the generic "Execution failed").
Either pass the exception/info through to `change_state(..., info=...)` in
`_read_results()`, or simplify the queue payload to avoid building
`traceback.format_exc()` that will be discarded.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -106,7 +107,8 @@ def _run_worker(
output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
except Exception as e:
log.exception("Task execution failed.")
- output.put((workload.ti.key, TaskInstanceState.FAILED, e))
+ safe_exc = Exception(f"{type(e).__name__}:
{str(e)}\n{traceback.format_exc()}")
+ output.put((workload.ti.key, TaskInstanceState.FAILED,
safe_exc))
Review Comment:
`traceback.format_exc()` can generate very large strings; sending this
through a multiprocessing queue for every failure can significantly increase
CPU/memory and IPC overhead. Consider queueing a smaller, structured payload
(e.g., exception type + message, and optionally a truncated traceback), or only
including the full traceback when debug logging is enabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]