kaxil commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2991911032
##########
shared/module_loading/src/airflow_shared/module_loading/__init__.py:
##########
@@ -43,6 +44,18 @@
from types import ModuleType
+def accepts_context(callback: Callable) -> bool:
+ """Check if callback accepts a 'context' parameter, *args, or **kwargs."""
+ try:
+ sig = inspect.signature(callback)
+ except (ValueError, TypeError):
+ return True
+ params = sig.parameters
+ return "context" in params or any(
+ p.kind in (inspect.Parameter.VAR_KEYWORD,
inspect.Parameter.VAR_POSITIONAL) for p in params.values()
Review Comment:
Adding `VAR_POSITIONAL` (`*args`) is a behavioral change from the original
`_accepts_context`, which only matched named `context` or `**kwargs`.
`CallbackTrigger.run()` calls `callback(**self.callback_kwargs,
context=context)` when `accepts_context` returns True. A function with only
`*args` can't accept keyword arguments, so this would raise TypeError.
Previously safe because `_accepts_context` didn't match `*args`.
##########
task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py:
##########
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Tests for the callback supervisor module."""
+
+from __future__ import annotations
+
+import pytest
+import structlog
+
+from airflow.sdk.execution_time.callback_supervisor import execute_callback
+
+
+def callback_no_args():
+ """A simple callback that takes no arguments."""
+ return "ok"
+
+
+def callback_with_kwargs(arg1, arg2):
+ """A callback that accepts keyword arguments."""
+ return f"{arg1}-{arg2}"
+
+
+def callback_that_raises():
+ """A callback that always raises."""
+ raise ValueError("something went wrong")
+
+
+class CallableClass:
+ """A class that returns a callable instance (like BaseNotifier)."""
+
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+
+ def __call__(self, context):
+ return "notified"
+
+
[email protected]
Review Comment:
These tests follow a uniform pattern (`execute_callback(path, kwargs, log)`
then assert `(success, error_contains)`). Good candidate for
`@pytest.mark.parametrize`.
Same applies to `TestExecuteCallbackWorkload` in `test_base_executor.py`.
The `log` fixture is just `structlog.get_logger()` with no setup/teardown --
could inline it.
##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of callback workloads."""
+
+from __future__ import annotations
+
+import time
+from importlib import import_module
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, Protocol
+
+import attrs
+import structlog
+from pydantic import TypeAdapter
+
+from airflow.sdk.execution_time.supervisor import (
+ MIN_HEARTBEAT_INTERVAL,
+ SOCKET_CLEANUP_TIMEOUT,
+ WatchedSubprocess,
+)
+
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger
+ from typing_extensions import Self
+
+ # Core (airflow.executors.workloads.base.BundleInfo) and SDK
(airflow.sdk.api.datamodels._generated.BundleInfo)
+ # are structurally identical, but MyPy treats them as different types.
This Protocol makes MyPy happy.
+ class _BundleInfoLike(Protocol):
+ name: str
+ version: str | None
+
+
+__all__ = ["CallbackSubprocess", "supervise_callback"]
+
+log: FilteringBoundLogger =
structlog.get_logger(logger_name="callback_supervisor")
+
+
+def execute_callback(
+ callback_path: str,
+ callback_kwargs: dict,
+ log,
+) -> tuple[bool, str | None]:
+ """
+ Execute a callback function by importing and calling it, returning the
success state.
+
+ Supports two patterns:
+ 1. Functions - called directly with kwargs
+ 2. Classes that return callable instances (like BaseNotifier) -
instantiated then called with context
+
+ Example:
+ # Function callback
+ execute_callback("my_module.alert_func", {"msg": "Alert!", "context":
{...}}, log)
+
+ # Notifier callback
+ execute_callback("airflow.providers.slack...SlackWebhookNotifier",
{"text": "Alert!"}, log)
+
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log: Logger instance for recording execution.
+ :return: Tuple of (success: bool, error_message: str | None)
+ """
+ from airflow.sdk._shared.module_loading import accepts_context
+
+ if not callback_path:
+ return False, "Callback path not found."
+
+ try:
+ # Import the callback callable
+ # Expected format: "module.path.to.function_or_class"
+ module_path, function_name = callback_path.rsplit(".", 1)
+ module = import_module(module_path)
+ callback_callable = getattr(module, function_name)
+
+ log.debug("Executing callback %s(%s)...", callback_path,
callback_kwargs)
+
+ kwargs_without_context = {k: v for k, v in callback_kwargs.items() if
k != "context"}
+
+ # Call the callable with all kwargs if it accepts context, otherwise
strip context.
+ if accepts_context(callback_callable):
+ result = callback_callable(**callback_kwargs)
+ else:
+ result = callback_callable(**kwargs_without_context)
+
+ # If the callback was a class then it is now instantiated and
callable, call it.
+ if callable(result):
+ if accepts_context(result):
+ result = result(**callback_kwargs)
+ else:
+ result = result(**kwargs_without_context)
+
Review Comment:
The old code called `result(context)` positionally. The new code calls
`result(**callback_kwargs)`, which passes all kwargs.
For `BaseNotifier.__call__(self, *args)`, keyword arguments raise TypeError.
The test's `CallableClass.__call__(self, context)` also breaks:
`instance(msg="alert")` gets "unexpected keyword argument 'msg'" since
`__call__` expects `context`, not `msg`.
`test_callable_class_pattern` should fail for this reason.
Suggest preserving the old positional call:
```python
context = callback_kwargs.get("context")
if callable(result):
result = result(context)
```
##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of callback workloads."""
+
+from __future__ import annotations
+
+import time
+from importlib import import_module
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, Protocol
+
+import attrs
+import structlog
+from pydantic import TypeAdapter
+
+from airflow.sdk.execution_time.supervisor import (
+ MIN_HEARTBEAT_INTERVAL,
+ SOCKET_CLEANUP_TIMEOUT,
+ WatchedSubprocess,
+)
+
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger
+ from typing_extensions import Self
+
+ # Core (airflow.executors.workloads.base.BundleInfo) and SDK
(airflow.sdk.api.datamodels._generated.BundleInfo)
+ # are structurally identical, but MyPy treats them as different types.
This Protocol makes MyPy happy.
+ class _BundleInfoLike(Protocol):
+ name: str
+ version: str | None
+
+
+__all__ = ["CallbackSubprocess", "supervise_callback"]
+
+log: FilteringBoundLogger =
structlog.get_logger(logger_name="callback_supervisor")
+
+
+def execute_callback(
+ callback_path: str,
+ callback_kwargs: dict,
+ log,
+) -> tuple[bool, str | None]:
+ """
+ Execute a callback function by importing and calling it, returning the
success state.
+
+ Supports two patterns:
+ 1. Functions - called directly with kwargs
+ 2. Classes that return callable instances (like BaseNotifier) -
instantiated then called with context
+
+ Example:
+ # Function callback
+ execute_callback("my_module.alert_func", {"msg": "Alert!", "context":
{...}}, log)
+
+ # Notifier callback
+ execute_callback("airflow.providers.slack...SlackWebhookNotifier",
{"text": "Alert!"}, log)
+
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log: Logger instance for recording execution.
+ :return: Tuple of (success: bool, error_message: str | None)
+ """
+ from airflow.sdk._shared.module_loading import accepts_context
+
+ if not callback_path:
+ return False, "Callback path not found."
+
+ try:
+ # Import the callback callable
+ # Expected format: "module.path.to.function_or_class"
+ module_path, function_name = callback_path.rsplit(".", 1)
+ module = import_module(module_path)
+ callback_callable = getattr(module, function_name)
+
+ log.debug("Executing callback %s(%s)...", callback_path,
callback_kwargs)
+
+ kwargs_without_context = {k: v for k, v in callback_kwargs.items() if
k != "context"}
+
+ # Call the callable with all kwargs if it accepts context, otherwise
strip context.
+ if accepts_context(callback_callable):
+ result = callback_callable(**callback_kwargs)
+ else:
+ result = callback_callable(**kwargs_without_context)
+
+ # If the callback was a class then it is now instantiated and
callable, call it.
+ if callable(result):
+ if accepts_context(result):
+ result = result(**callback_kwargs)
+ else:
+ result = result(**kwargs_without_context)
+
+ log.info("Callback %s executed successfully.", callback_path)
+ return True, None
+
+ except Exception as e:
+ error_msg = f"Callback execution failed: {type(e).__name__}: {str(e)}"
+ log.exception("Callback %s(%s) execution failed: %s", callback_path,
callback_kwargs, error_msg)
+ return False, error_msg
+
+
+# An empty message set; the callback subprocess doesn't currently communicate
back to the
+# supervisor. This means callback code cannot access runtime services like
Connection.get()
+# or Variable.get() which require the supervisor to pass requests to the API
server.
+# To enable this, add the needed message types here and implement
_handle_request accordingly.
+# See ActivitySubprocess.decoder in supervisor.py for the full task message
set and examples.
+_EmptyMessage: TypeAdapter[None] = TypeAdapter(None)
+
+
[email protected](kw_only=True)
+class CallbackSubprocess(WatchedSubprocess):
+ """
+ Supervised subprocess for executing callbacks.
+
+ Uses the WatchedSubprocess infrastructure for fork/monitor/signal handling
+ while keeping a simple lifecycle: start, run callback, exit.
+ """
+
+ decoder: ClassVar[TypeAdapter] = _EmptyMessage
+
+ @classmethod
+ def start( # type: ignore[override]
+ cls,
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ logger: FilteringBoundLogger | None = None,
+ **kwargs,
+ ) -> Self:
+ """Fork and start a new subprocess to execute the given callback."""
+ from uuid import UUID
+
+ # Use a closure to pass callback data to the child process. Note that
this
+ # ONLY works because WatchedSubprocess.start() uses os.fork(), so the
child
+ # inherits the parent's memory space and the variables are available
directly.
+ def _target():
+ import sys
+
+ _log = structlog.get_logger(logger_name="callback_runner")
+ success, error_msg = execute_callback(callback_path,
callback_kwargs, _log)
+ if not success:
+ _log.error("Callback failed", error=error_msg)
+ sys.exit(1)
+
+ return super().start(
+ id=UUID(id) if not isinstance(id, UUID) else id,
+ target=_target,
+ logger=logger,
+ **kwargs,
+ )
+
+ def wait(self) -> int:
+ """
+ Wait for the callback subprocess to complete.
+
+ Mirrors the structure of ActivitySubprocess.wait() but without
heartbeating,
+ task API state management, or log uploading.
+ """
+ if self._exit_code is not None:
+ return self._exit_code
+
+ try:
+ self._monitor_subprocess()
+ finally:
+ self.selector.close()
+
+ self._exit_code = self._exit_code if self._exit_code is not None else 1
+ return self._exit_code
+
+ def _monitor_subprocess(self):
+ """
+ Monitor the subprocess until it exits.
+
+ A simplified version of ActivitySubprocess._monitor_subprocess()
without heartbeating
+ or timeout handling, just process output monitoring and stuck-socket
cleanup.
+ """
+ while self._exit_code is None or self._open_sockets:
+ self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)
+
+ # If the process has exited but sockets remain open, apply a
timeout
+ # to prevent hanging indefinitely on stuck sockets.
+ if self._exit_code is not None and self._open_sockets:
+ if (
+ self._process_exit_monotonic
+ and time.monotonic() - self._process_exit_monotonic >
SOCKET_CLEANUP_TIMEOUT
+ ):
+ log.warning(
+ "Process exited with open sockets; cleaning up after
timeout",
+ pid=self.pid,
+ exit_code=self._exit_code,
+ socket_types=list(self._open_sockets.values()),
+ timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
+ )
+ self._cleanup_open_sockets()
+
+ def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) ->
None:
+ """Handle incoming requests from the callback subprocess (currently
none expected)."""
+ log.warning("Unexpected request from callback subprocess", msg=msg)
+
+
+def _configure_logging(log_path: str) -> tuple[FilteringBoundLogger, BinaryIO]:
+ """Configure file-based logging for the callback subprocess."""
+ from airflow.sdk.log import init_log_file, logging_processors
+
+ log_file = init_log_file(log_path)
+ log_file_descriptor: BinaryIO = log_file.open("ab")
+ underlying_logger = structlog.BytesLogger(log_file_descriptor)
+ processors = logging_processors(json_output=True)
+ logger = structlog.wrap_logger(underlying_logger, processors=processors,
logger_name="callback").bind()
+
+ return logger, log_file_descriptor
+
+
+def supervise_callback(
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ log_path: str | None = None,
+ bundle_info: _BundleInfoLike | None = None,
+) -> int:
+ """
+ Run a single callback execution to completion in a supervised subprocess.
+
+ :param id: Unique identifier for this callback execution.
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log_path: Path to write logs, if required.
+ :param bundle_info: When provided, the bundle's path is added to sys.path
so callbacks in Dag Bundles are importable.
+ :return: Exit code of the subprocess (0 = success).
+ """
+ import sys
Review Comment:
`import sys` (here and line 150 inside `_target()`), `from uuid import UUID`
(line 144) -- stdlib imports with no circular dep risk. Should be at the top of
the file.
##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of callback workloads."""
+
+from __future__ import annotations
+
+import time
+from importlib import import_module
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, Protocol
+
+import attrs
+import structlog
+from pydantic import TypeAdapter
+
+from airflow.sdk.execution_time.supervisor import (
+ MIN_HEARTBEAT_INTERVAL,
+ SOCKET_CLEANUP_TIMEOUT,
+ WatchedSubprocess,
+)
+
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger
+ from typing_extensions import Self
+
+ # Core (airflow.executors.workloads.base.BundleInfo) and SDK
(airflow.sdk.api.datamodels._generated.BundleInfo)
+ # are structurally identical, but MyPy treats them as different types.
This Protocol makes MyPy happy.
+ class _BundleInfoLike(Protocol):
+ name: str
+ version: str | None
+
+
+__all__ = ["CallbackSubprocess", "supervise_callback"]
+
+log: FilteringBoundLogger =
structlog.get_logger(logger_name="callback_supervisor")
+
+
+def execute_callback(
+ callback_path: str,
+ callback_kwargs: dict,
+ log,
+) -> tuple[bool, str | None]:
+ """
+ Execute a callback function by importing and calling it, returning the
success state.
+
+ Supports two patterns:
+ 1. Functions - called directly with kwargs
+ 2. Classes that return callable instances (like BaseNotifier) -
instantiated then called with context
+
+ Example:
+ # Function callback
+ execute_callback("my_module.alert_func", {"msg": "Alert!", "context":
{...}}, log)
+
+ # Notifier callback
+ execute_callback("airflow.providers.slack...SlackWebhookNotifier",
{"text": "Alert!"}, log)
+
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log: Logger instance for recording execution.
+ :return: Tuple of (success: bool, error_message: str | None)
+ """
+ from airflow.sdk._shared.module_loading import accepts_context
+
+ if not callback_path:
+ return False, "Callback path not found."
+
+ try:
+ # Import the callback callable
+ # Expected format: "module.path.to.function_or_class"
+ module_path, function_name = callback_path.rsplit(".", 1)
+ module = import_module(module_path)
+ callback_callable = getattr(module, function_name)
+
+ log.debug("Executing callback %s(%s)...", callback_path,
callback_kwargs)
+
+ kwargs_without_context = {k: v for k, v in callback_kwargs.items() if
k != "context"}
+
+ # Call the callable with all kwargs if it accepts context, otherwise
strip context.
+ if accepts_context(callback_callable):
+ result = callback_callable(**callback_kwargs)
+ else:
+ result = callback_callable(**kwargs_without_context)
+
+ # If the callback was a class then it is now instantiated and
callable, call it.
+ if callable(result):
+ if accepts_context(result):
+ result = result(**callback_kwargs)
+ else:
+ result = result(**kwargs_without_context)
+
+ log.info("Callback %s executed successfully.", callback_path)
+ return True, None
+
+ except Exception as e:
+ error_msg = f"Callback execution failed: {type(e).__name__}: {str(e)}"
+ log.exception("Callback %s(%s) execution failed: %s", callback_path,
callback_kwargs, error_msg)
+ return False, error_msg
+
+
+# An empty message set; the callback subprocess doesn't currently communicate
back to the
+# supervisor. This means callback code cannot access runtime services like
Connection.get()
+# or Variable.get() which require the supervisor to pass requests to the API
server.
+# To enable this, add the needed message types here and implement
_handle_request accordingly.
+# See ActivitySubprocess.decoder in supervisor.py for the full task message
set and examples.
+_EmptyMessage: TypeAdapter[None] = TypeAdapter(None)
+
+
[email protected](kw_only=True)
+class CallbackSubprocess(WatchedSubprocess):
+ """
+ Supervised subprocess for executing callbacks.
+
+ Uses the WatchedSubprocess infrastructure for fork/monitor/signal handling
+ while keeping a simple lifecycle: start, run callback, exit.
+ """
+
+ decoder: ClassVar[TypeAdapter] = _EmptyMessage
+
+ @classmethod
+ def start( # type: ignore[override]
+ cls,
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ logger: FilteringBoundLogger | None = None,
+ **kwargs,
+ ) -> Self:
+ """Fork and start a new subprocess to execute the given callback."""
+ from uuid import UUID
+
+ # Use a closure to pass callback data to the child process. Note that
this
+ # ONLY works because WatchedSubprocess.start() uses os.fork(), so the
child
+ # inherits the parent's memory space and the variables are available
directly.
+ def _target():
+ import sys
+
+ _log = structlog.get_logger(logger_name="callback_runner")
+ success, error_msg = execute_callback(callback_path,
callback_kwargs, _log)
+ if not success:
+ _log.error("Callback failed", error=error_msg)
+ sys.exit(1)
+
+ return super().start(
+ id=UUID(id) if not isinstance(id, UUID) else id,
+ target=_target,
+ logger=logger,
+ **kwargs,
+ )
+
+ def wait(self) -> int:
+ """
+ Wait for the callback subprocess to complete.
+
+ Mirrors the structure of ActivitySubprocess.wait() but without
heartbeating,
+ task API state management, or log uploading.
+ """
+ if self._exit_code is not None:
+ return self._exit_code
+
+ try:
+ self._monitor_subprocess()
+ finally:
+ self.selector.close()
+
+ self._exit_code = self._exit_code if self._exit_code is not None else 1
+ return self._exit_code
+
+ def _monitor_subprocess(self):
+ """
+ Monitor the subprocess until it exits.
+
+ A simplified version of ActivitySubprocess._monitor_subprocess()
without heartbeating
+ or timeout handling, just process output monitoring and stuck-socket
cleanup.
+ """
+ while self._exit_code is None or self._open_sockets:
+ self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)
+
+ # If the process has exited but sockets remain open, apply a
timeout
+ # to prevent hanging indefinitely on stuck sockets.
+ if self._exit_code is not None and self._open_sockets:
+ if (
+ self._process_exit_monotonic
+ and time.monotonic() - self._process_exit_monotonic >
SOCKET_CLEANUP_TIMEOUT
+ ):
+ log.warning(
+ "Process exited with open sockets; cleaning up after
timeout",
+ pid=self.pid,
+ exit_code=self._exit_code,
+ socket_types=list(self._open_sockets.values()),
+ timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
+ )
+ self._cleanup_open_sockets()
+
+ def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) ->
None:
+ """Handle incoming requests from the callback subprocess (currently
none expected)."""
+ log.warning("Unexpected request from callback subprocess", msg=msg)
+
+
+def _configure_logging(log_path: str) -> tuple[FilteringBoundLogger, BinaryIO]:
+ """Configure file-based logging for the callback subprocess."""
+ from airflow.sdk.log import init_log_file, logging_processors
+
+ log_file = init_log_file(log_path)
+ log_file_descriptor: BinaryIO = log_file.open("ab")
+ underlying_logger = structlog.BytesLogger(log_file_descriptor)
+ processors = logging_processors(json_output=True)
+ logger = structlog.wrap_logger(underlying_logger, processors=processors,
logger_name="callback").bind()
+
+ return logger, log_file_descriptor
+
+
+def supervise_callback(
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ log_path: str | None = None,
+ bundle_info: _BundleInfoLike | None = None,
+) -> int:
+ """
+ Run a single callback execution to completion in a supervised subprocess.
+
+ :param id: Unique identifier for this callback execution.
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log_path: Path to write logs, if required.
+ :param bundle_info: When provided, the bundle's path is added to sys.path
so callbacks in Dag Bundles are importable.
+ :return: Exit code of the subprocess (0 = success).
+ """
+ import sys
+
+ start = time.monotonic()
+
+ # If bundle info is provided, initialize the bundle and ensure its path is
importable.
+ # This is needed for user-defined callbacks that live inside a DAG bundle
rather than
+ # in an installed package or the plugins directory.
+ if bundle_info and bundle_info.name:
+ try:
+ from airflow.dag_processing.bundles.manager import
DagBundlesManager
+
+ bundle = DagBundlesManager().get_bundle(
+ name=bundle_info.name,
+ version=bundle_info.version,
+ )
+ bundle.initialize()
Review Comment:
Bundle initialization (`bundle.initialize()` -- may do git clones/file
reads) runs in the supervisor process before `os.fork()`. If it hangs or
crashes, it takes down the LocalExecutor worker.
`supervise_task` handles bundle init inside `ActivitySubprocess.start()` in
the child process. Consider moving this into the forked child (inside
`_target()` or `CallbackSubprocess.start()`).
##########
task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py:
##########
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of callback workloads."""
+
+from __future__ import annotations
+
+import time
+from importlib import import_module
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, Protocol
+
+import attrs
+import structlog
+from pydantic import TypeAdapter
+
+from airflow.sdk.execution_time.supervisor import (
+ MIN_HEARTBEAT_INTERVAL,
+ SOCKET_CLEANUP_TIMEOUT,
+ WatchedSubprocess,
+)
+
+if TYPE_CHECKING:
+ from structlog.typing import FilteringBoundLogger
+ from typing_extensions import Self
+
+ # Core (airflow.executors.workloads.base.BundleInfo) and SDK
(airflow.sdk.api.datamodels._generated.BundleInfo)
+ # are structurally identical, but MyPy treats them as different types.
This Protocol makes MyPy happy.
+ class _BundleInfoLike(Protocol):
+ name: str
+ version: str | None
+
+
+__all__ = ["CallbackSubprocess", "supervise_callback"]
+
+log: FilteringBoundLogger =
structlog.get_logger(logger_name="callback_supervisor")
+
+
+def execute_callback(
+ callback_path: str,
+ callback_kwargs: dict,
+ log,
+) -> tuple[bool, str | None]:
+ """
+ Execute a callback function by importing and calling it, returning the
success state.
+
+ Supports two patterns:
+ 1. Functions - called directly with kwargs
+ 2. Classes that return callable instances (like BaseNotifier) -
instantiated then called with context
+
+ Example:
+ # Function callback
+ execute_callback("my_module.alert_func", {"msg": "Alert!", "context":
{...}}, log)
+
+ # Notifier callback
+ execute_callback("airflow.providers.slack...SlackWebhookNotifier",
{"text": "Alert!"}, log)
+
+ :param callback_path: Dot-separated import path to the callback function
or class.
+ :param callback_kwargs: Keyword arguments to pass to the callback.
+ :param log: Logger instance for recording execution.
+ :return: Tuple of (success: bool, error_message: str | None)
+ """
+ from airflow.sdk._shared.module_loading import accepts_context
+
+ if not callback_path:
+ return False, "Callback path not found."
+
+ try:
+ # Import the callback callable
+ # Expected format: "module.path.to.function_or_class"
+ module_path, function_name = callback_path.rsplit(".", 1)
+ module = import_module(module_path)
+ callback_callable = getattr(module, function_name)
+
+ log.debug("Executing callback %s(%s)...", callback_path,
callback_kwargs)
+
+ kwargs_without_context = {k: v for k, v in callback_kwargs.items() if
k != "context"}
+
+ # Call the callable with all kwargs if it accepts context, otherwise
strip context.
+ if accepts_context(callback_callable):
+ result = callback_callable(**callback_kwargs)
+ else:
+ result = callback_callable(**kwargs_without_context)
+
+ # If the callback was a class then it is now instantiated and
callable, call it.
+ if callable(result):
+ if accepts_context(result):
+ result = result(**callback_kwargs)
+ else:
+ result = result(**kwargs_without_context)
+
+ log.info("Callback %s executed successfully.", callback_path)
+ return True, None
+
+ except Exception as e:
+ error_msg = f"Callback execution failed: {type(e).__name__}: {str(e)}"
+ log.exception("Callback %s(%s) execution failed: %s", callback_path,
callback_kwargs, error_msg)
+ return False, error_msg
+
+
+# An empty message set; the callback subprocess doesn't currently communicate
back to the
+# supervisor. This means callback code cannot access runtime services like
Connection.get()
+# or Variable.get() which require the supervisor to pass requests to the API
server.
+# To enable this, add the needed message types here and implement
_handle_request accordingly.
+# See ActivitySubprocess.decoder in supervisor.py for the full task message
set and examples.
+_EmptyMessage: TypeAdapter[None] = TypeAdapter(None)
+
+
[email protected](kw_only=True)
+class CallbackSubprocess(WatchedSubprocess):
+ """
+ Supervised subprocess for executing callbacks.
+
+ Uses the WatchedSubprocess infrastructure for fork/monitor/signal handling
+ while keeping a simple lifecycle: start, run callback, exit.
+ """
+
+ decoder: ClassVar[TypeAdapter] = _EmptyMessage
+
+ @classmethod
+ def start( # type: ignore[override]
+ cls,
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ logger: FilteringBoundLogger | None = None,
+ **kwargs,
+ ) -> Self:
+ """Fork and start a new subprocess to execute the given callback."""
+ from uuid import UUID
+
+ # Use a closure to pass callback data to the child process. Note that
this
+ # ONLY works because WatchedSubprocess.start() uses os.fork(), so the
child
+ # inherits the parent's memory space and the variables are available
directly.
+ def _target():
+ import sys
+
+ _log = structlog.get_logger(logger_name="callback_runner")
+ success, error_msg = execute_callback(callback_path,
callback_kwargs, _log)
+ if not success:
+ _log.error("Callback failed", error=error_msg)
+ sys.exit(1)
+
+ return super().start(
+ id=UUID(id) if not isinstance(id, UUID) else id,
+ target=_target,
+ logger=logger,
+ **kwargs,
+ )
+
+ def wait(self) -> int:
+ """
+ Wait for the callback subprocess to complete.
+
+ Mirrors the structure of ActivitySubprocess.wait() but without
heartbeating,
+ task API state management, or log uploading.
+ """
+ if self._exit_code is not None:
+ return self._exit_code
+
+ try:
+ self._monitor_subprocess()
+ finally:
+ self.selector.close()
+
+ self._exit_code = self._exit_code if self._exit_code is not None else 1
+ return self._exit_code
+
+ def _monitor_subprocess(self):
+ """
+ Monitor the subprocess until it exits.
+
+ A simplified version of ActivitySubprocess._monitor_subprocess()
without heartbeating
+ or timeout handling, just process output monitoring and stuck-socket
cleanup.
+ """
+ while self._exit_code is None or self._open_sockets:
+ self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)
+
+ # If the process has exited but sockets remain open, apply a
timeout
+ # to prevent hanging indefinitely on stuck sockets.
+ if self._exit_code is not None and self._open_sockets:
+ if (
+ self._process_exit_monotonic
+ and time.monotonic() - self._process_exit_monotonic >
SOCKET_CLEANUP_TIMEOUT
+ ):
+ log.warning(
+ "Process exited with open sockets; cleaning up after
timeout",
+ pid=self.pid,
+ exit_code=self._exit_code,
+ socket_types=list(self._open_sockets.values()),
+ timeout_seconds=SOCKET_CLEANUP_TIMEOUT,
+ )
+ self._cleanup_open_sockets()
+
+ def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) ->
None:
+ """Handle incoming requests from the callback subprocess (currently
none expected)."""
+ log.warning("Unexpected request from callback subprocess", msg=msg)
+
+
+def _configure_logging(log_path: str) -> tuple[FilteringBoundLogger, BinaryIO]:
+ """Configure file-based logging for the callback subprocess."""
+ from airflow.sdk.log import init_log_file, logging_processors
+
+ log_file = init_log_file(log_path)
+ log_file_descriptor: BinaryIO = log_file.open("ab")
+ underlying_logger = structlog.BytesLogger(log_file_descriptor)
+ processors = logging_processors(json_output=True)
+ logger = structlog.wrap_logger(underlying_logger, processors=processors,
logger_name="callback").bind()
+
+ return logger, log_file_descriptor
+
+
+def supervise_callback(
+ *,
+ id: str,
+ callback_path: str,
+ callback_kwargs: dict,
+ log_path: str | None = None,
+ bundle_info: _BundleInfoLike | None = None,
+) -> int:
+ """
Review Comment:
`supervise_task` calls `_make_process_nondumpable()` to prevent core dumps
from leaking secrets. `supervise_callback` does not, even though callbacks
handle connection credentials (Notifiers with Slack tokens, PagerDuty keys,
etc.).
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2000,7 +1999,7 @@ def _configure_logging(log_path: str, client: Client) ->
tuple[FilteringBoundLog
return logger, log_file_descriptor
-def supervise(
+def supervise_task(
Review Comment:
Edge3 provider still calls `supervise()` directly (only a comment was
updated in this PR), so it'll trigger the deprecation warning immediately after
merge.
Consider keeping `supervise` as the task-specific function (no rename, no
deprecation) and adding `supervise_workload` + `supervise_callback` alongside
it. Less churn, no breakage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]