dabla commented on code in PR #59876:
URL: https://github.com/apache/airflow/pull/59876#discussion_r2672666601
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -736,17 +734,52 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
)
-# This global variable will be used by Connection/Variable/XCom classes, or
other parts of the task's execution,
+# This global class will be used by Connection/Variable/XCom classes, or other
parts of the task's execution,
# to send requests back to the supervisor process.
#
# Why it needs to be a global:
# - Many parts of Airflow's codebase (e.g., connections, variables, and XComs)
may rely on making dynamic requests
# to the parent process during task execution.
# - These calls occur in various locations and cannot easily pass the
`CommsDecoder` instance through the
# deeply nested execution stack.
-# - By defining `SUPERVISOR_COMMS` as a global, it ensures that this
communication mechanism is readily
+# - By defining this as a static class with accessors, it ensures that this
communication mechanism is readily
# accessible wherever needed during task execution without modifying every
layer of the call stack.
-SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
+# Not perfect but getter than a global variable.
+class _SupervisorCommsHolder:
+ comms: CommsDecoder[ToTask, ToSupervisor] | None = None
+
+
+def supervisor_send(msg: ToSupervisor) -> ToTask | None:
+ """Send a message to the supervisor as convenience for
get_supervisor_comms().send()."""
+ if _SupervisorCommsHolder.comms is None:
+ raise RuntimeError("Supervisor comms not initialized yet. Call
set_supervisor_comms() instead.")
+ return _SupervisorCommsHolder.comms.send(msg)
Review Comment:
In the past I created some kind of Singleton metadata class, then you only
needed to extend your class with it and it would behave like a singleton, even
if you could call the constructor multiple times, you would always end up
having the same instance. In Java this was simple to achieve by defining you
class as an enum, but not in Python.
So I ended up creating a Singleton class for Python:
```
class Singleton(type):
_instances: dict = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args,
**kwargs)
return cls._instances[cls]
```
I did the same for the Flyweight pattern:
```
import inspect
from itertools import count
class Flyweight(type):
_instances: dict = {}
_ignore_args: dict = {}
@classmethod
def __prepare__(mcs, name, bases, **kwargs):
return super().__prepare__(name, bases, **kwargs)
def __new__(mcs, name, bases, namespace, **kwargs):
return super().__new__(mcs, name, bases, namespace)
def __init__(cls, name, bases, namespace, **kwargs):
super().__init__(name, bases, namespace, **kwargs)
if kwargs.get("ignore_args"):
cls._ignore_args[name] = kwargs.get("ignore_args")
def __filter_ignored_arguments(cls, *args, **kwargs):
parameters =
list(inspect.signature(cls.__init__).parameters.values())
parameters.pop(0)
if len(kwargs) > 0:
constructor_args = [
(index, name) for index, name in enumerate(kwargs.keys())
]
args = tuple(kwargs.values())
else:
constructor_args = [
(index, parameter.name) for index, parameter in
enumerate(parameters)
]
ignored_indices = list(
map(
lambda arg: arg[0],
filter(
lambda arg: arg[1] in cls._ignore_args.get(cls.__name__,
[]),
constructor_args,
),
)
)
index = count(start=0, step=1)
return [value for value in args if next(index) not in
ignored_indices]
def __call__(cls, *args, **kwargs):
key = "{}-{}".format(
cls.__name__, cls.__filter_ignored_arguments(*args,
**kwargs).__str__()
)
if key not in cls._instances:
cls._instances[key] = super(Flyweight, cls).__call__(*args,
**kwargs)
return cls._instances[key]
```
Dunno if this could be a nice addition to solve those generic problems?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]