jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r422634752
##########
File path: airflow/models/taskinstance.py
##########
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
session.merge(self)
session.commit()
+ def get_additional_execution_contextmanager(self, execution_context):
+ """
+ Retrieves the user defined execution context callback from the
configuration,
+ and validates that it is indeed a context manager
+
+ :param execution_context: the current execution context to be passed
to user ctx
+ """
+ additional_execution_contextmanager = conf.getimport("core",
"additional_execute_contextmanager")
+ if additional_execution_contextmanager:
+ try:
+ user_ctx_obj = additional_execution_contextmanager(self,
execution_context)
+ if hasattr(user_ctx_obj, "__enter__") and
hasattr(user_ctx_obj, "__exit__"):
+ return user_ctx_obj
+ else:
+ raise AirflowException(f"Loaded function
{additional_execution_contextmanager} "
+ f"as additional execution
contextmanager, but it does not have "
+ f"__enter__ or __exit__ method!")
Review comment:
The tests verify that this works for decorated contextmanagers as well :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]