johnhoran commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2717890364
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -210,3 +212,15 @@ def annotations_for_logging_task_metadata(annotation_set):
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging
+
+
+def serializable_callback(f):
+ """Convert async callback so it can run in sync or async mode."""
+
+ @wraps(f)
+ def wrapper(*args, mode: str, **kwargs):
+ if mode == ExecutionMode.ASYNC:
+ return f(*args, mode=mode, **kwargs)
+ return asyncio.run(f(*args, mode=mode, **kwargs))
+
Review Comment:
Well the idea was to define a helper function that could be used in a
callback like
```python
class AsyncCallback(KubernetesPodOperatorCallback):
@staticmethod
@serializable_callback
async def progress_callback(
*, line: str, **kwargs
) -> None:
...
```
and that the same callback could be used in the triggerer or in the
operator, which is invoked when the triggerer hands back to the operator if
there are remaining logs in the pod that haven't been processed. Ideally then
the callback should be written in async format, as it would be blocking in the
callback if it wasn't, though that obviously depends on what the callback is
doing.
Invocations of these callbacks are in the operator/triggerer, I don't know
of anywhere else they are used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]