dabla commented on code in PR #59876:
URL: https://github.com/apache/airflow/pull/59876#discussion_r2672666601


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -736,17 +734,52 @@ def parse(what: StartupDetails, log: Logger) -> 
RuntimeTaskInstance:
     )
 
 
-# This global variable will be used by Connection/Variable/XCom classes, or 
other parts of the task's execution,
+# This global class will be used by Connection/Variable/XCom classes, or other 
parts of the task's execution,
 # to send requests back to the supervisor process.
 #
 # Why it needs to be a global:
 # - Many parts of Airflow's codebase (e.g., connections, variables, and XComs) 
may rely on making dynamic requests
 #   to the parent process during task execution.
 # - These calls occur in various locations and cannot easily pass the 
`CommsDecoder` instance through the
 #   deeply nested execution stack.
-# - By defining `SUPERVISOR_COMMS` as a global, it ensures that this 
communication mechanism is readily
+# - By defining this as a static class with accessors, it ensures that this 
communication mechanism is readily
 #   accessible wherever needed during task execution without modifying every 
layer of the call stack.
-SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
+#   Not perfect but getter than a global variable.
+class _SupervisorCommsHolder:
+    comms: CommsDecoder[ToTask, ToSupervisor] | None = None
+
+
+def supervisor_send(msg: ToSupervisor) -> ToTask | None:
+    """Send a message to the supervisor as convenience for 
get_supervisor_comms().send()."""
+    if _SupervisorCommsHolder.comms is None:
+        raise RuntimeError("Supervisor comms not initialized yet. Call 
set_supervisor_comms() instead.")
+    return _SupervisorCommsHolder.comms.send(msg)

Review Comment:
   In the past I created some kind of Singleton metadata class, then you only 
needed to extend your class with it and it would behave like a singleton, even 
if you could call the constructor multiple times, you would always end up 
having the same instance.  In Java this was simple to achieve by defining you 
class as an enum, but not in Python.
   
   So I ended up creating a Singleton class for Python:
   
   ```
   class Singleton(type):
       _instances: dict = {}
   
       def __call__(cls, *args, **kwargs):
           if cls not in cls._instances:
               cls._instances[cls] = super(Singleton, cls).__call__(*args, 
**kwargs)
           return cls._instances[cls]
   ```
   
   I did the same for the Flyweight pattern:
   
   ```
   import inspect
   from itertools import count
   
   
   class Flyweight(type):
       _instances: dict = {}
       _ignore_args: dict = {}
   
       @classmethod
       def __prepare__(mcs, name, bases, **kwargs):
           return super().__prepare__(name, bases, **kwargs)
   
       def __new__(mcs, name, bases, namespace, **kwargs):
           return super().__new__(mcs, name, bases, namespace)
   
       def __init__(cls, name, bases, namespace, **kwargs):
           super().__init__(name, bases, namespace, **kwargs)
           if kwargs.get("ignore_args"):
               cls._ignore_args[name] = kwargs.get("ignore_args")
   
       def __filter_ignored_arguments(cls, *args, **kwargs):
           parameters = 
list(inspect.signature(cls.__init__).parameters.values())
           parameters.pop(0)
           if len(kwargs) > 0:
               constructor_args = [
                   (index, name) for index, name in enumerate(kwargs.keys())
               ]
               args = tuple(kwargs.values())
           else:
               constructor_args = [
                   (index, parameter.name) for index, parameter in 
enumerate(parameters)
               ]
           ignored_indices = list(
               map(
                   lambda arg: arg[0],
                   filter(
                       lambda arg: arg[1] in cls._ignore_args.get(cls.__name__, 
[]),
                       constructor_args,
                   ),
               )
           )
           index = count(start=0, step=1)
           return [value for value in args if next(index) not in 
ignored_indices]
   
       def __call__(cls, *args, **kwargs):
           key = "{}-{}".format(
               cls.__name__, cls.__filter_ignored_arguments(*args, 
**kwargs).__str__()
           )
           if key not in cls._instances:
               cls._instances[key] = super(Flyweight, cls).__call__(*args, 
**kwargs)
           return cls._instances[key]
   ```
   
   Dunno if this could be a nice addition to solve those generic problems?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to