ashb commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize
Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215551823
##########
File path: airflow/executors/celery_executor.py
##########
@@ -63,6 +69,40 @@ def execute_command(command):
raise AirflowException('Celery command failed')
+class ExceptionWithTraceback(object):
+ """
+ Wrapper class used to propogate exceptions to parent processes from
subprocesses.
+ :param exception: The exception to wrap
+ :type exception: Exception
+ :param traceback: The stacktrace to wrap
+ :type traceback: str
+ """
+
+ def __init__(self, exception, exception_traceback):
+ self.exception = exception
+ self.traceback = exception_traceback
+
+
+def fetch_celery_task_state(celery_task):
+ """
+ Fetch and return the state of the given celery task. The scope of this
function is
+ global so that it can be called by subprocesses in the pool.
+ :param celery_task: a tuple of the Celery task key and the async Celery
object used
+ to fetch the task's state
+ :type celery_task: (str, celery.result.AsyncResult)
+ :return: a tuple of the Celery task key and the Celery state of the task
+ :rtype: (str, str)
+ """
+
+ try:
+ res = (celery_task[0], celery_task[1].state)
Review comment:
I'm guessing that it's accessing the `.state` property that causes Celery to
make the network request? Could you add a comment here saying so? (Otherwise
this whole function looks odd and pointless
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services