This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bb2a8f89f2 D205 Support - Executors (#32213)
bb2a8f89f2 is described below
commit bb2a8f89f274cf0c5a4f5f8f2ce64094f9a21dab
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 27 12:56:52 2023 -0700
D205 Support - Executors (#32213)
---
airflow/executors/base_executor.py | 13 +++++--------
airflow/executors/celery_executor.py | 1 +
airflow/executors/celery_executor_utils.py | 1 +
airflow/executors/celery_kubernetes_executor.py | 1 +
airflow/executors/kubernetes_executor.py | 8 +++-----
airflow/executors/local_executor.py | 4 ++--
airflow/executors/local_kubernetes_executor.py | 4 ++--
7 files changed, 15 insertions(+), 17 deletions(-)
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index 9599adabdf..1a44540b07 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -82,10 +82,7 @@ class RunningRetryAttemptType:
return (pendulum.now("UTC") - self.first_attempt_time).total_seconds()
def can_try_again(self):
- """
- If there has been at least one try greater than MIN_SECONDS after
first attempt,
- then return False. Otherwise, return True.
- """
+ """Return False if there has been at least one try greater than
MIN_SECONDS, otherwise return True."""
if self.tries_after_min > 0:
return False
@@ -100,11 +97,9 @@ class RunningRetryAttemptType:
class BaseExecutor(LoggingMixin):
"""
- Class to derive in order to implement concrete executors.
- Such as, Celery, Kubernetes, Local, Sequential and the likes.
+ Base class to inherit for concrete executors such as Celery, Kubernetes,
Local, Sequential, etc.
- :param parallelism: how many jobs should run at one time. Set to
- ``0`` for infinity.
+ :param parallelism: how many jobs should run at one time. Set to ``0`` for
infinity.
"""
supports_ad_hoc_ti_run: bool = False
@@ -201,6 +196,7 @@ class BaseExecutor(LoggingMixin):
def sync(self) -> None:
"""
Sync will get called periodically by the heartbeat method.
+
Executors should override this to perform gather statuses.
"""
@@ -390,6 +386,7 @@ class BaseExecutor(LoggingMixin):
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) ->
list[str]: # pragma: no cover
"""
Handle remnants of tasks that were failed because they were stuck in
queued.
+
Tasks can get stuck in queued. If such a task is detected, it will be
marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked
as `FAILED`
if it doesn't.
diff --git a/airflow/executors/celery_executor.py
b/airflow/executors/celery_executor.py
index de59804a04..b3a011c815 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -295,6 +295,7 @@ class CeleryExecutor(BaseExecutor):
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in
queued.
+
Tasks can get stuck in queued. If such a task is detected, it will be
marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked
as `FAILED`
if it doesn't.
diff --git a/airflow/executors/celery_executor_utils.py
b/airflow/executors/celery_executor_utils.py
index 6330b62c6c..49112bba6a 100644
--- a/airflow/executors/celery_executor_utils.py
+++ b/airflow/executors/celery_executor_utils.py
@@ -16,6 +16,7 @@
# under the License.
"""
Utilities and classes used by the Celery Executor.
+
Much of this code is expensive to import/load, be careful where this module is
imported.
"""
diff --git a/airflow/executors/celery_kubernetes_executor.py
b/airflow/executors/celery_kubernetes_executor.py
index 77dc14753f..d528c43fe5 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
class CeleryKubernetesExecutor(LoggingMixin):
"""
CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.
+
It chooses an executor to use based on the queue defined on the task.
When the queue is the value of ``kubernetes_queue`` in section
``[celery_kubernetes_executor]``
of the configuration (default value: `kubernetes`), KubernetesExecutor is
selected to run the task,
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 8707bf9699..dd1868a192 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -432,12 +432,9 @@ class AirflowKubernetesScheduler(LoggingMixin):
def sync(self) -> None:
"""
- The sync function checks the status of all currently running
kubernetes jobs.
- If a job is completed, its status is placed in the result queue to
- be sent back to the scheduler.
-
- :return:
+ Checks the status of all currently running kubernetes jobs.
+ If a job is completed, its status is placed in the result queue to be
sent back to the scheduler.
"""
self.log.debug("Syncing KubernetesExecutor")
self._health_check_kube_watchers()
@@ -880,6 +877,7 @@ class KubernetesExecutor(BaseExecutor):
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in
queued.
+
Tasks can get stuck in queued. If such a task is detected, it will be
marked
as `UP_FOR_RETRY` if the task instance has remaining retries or marked
as `FAILED`
if it doesn't.
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index 715bdb42ae..ca54a387c8 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -203,8 +203,8 @@ class QueuedLocalWorker(LocalWorkerBase):
class LocalExecutor(BaseExecutor):
"""
LocalExecutor executes tasks locally in parallel.
- It uses the multiprocessing Python library and queues to parallelize the
execution
- of tasks.
+
+ It uses the multiprocessing Python library and queues to parallelize the
execution of tasks.
:param parallelism: how many parallel processes are run in the executor
"""
diff --git a/airflow/executors/local_kubernetes_executor.py
b/airflow/executors/local_kubernetes_executor.py
index 050132c1a0..cc95232c91 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -33,8 +33,8 @@ if TYPE_CHECKING:
class LocalKubernetesExecutor(LoggingMixin):
"""
- LocalKubernetesExecutor consists of LocalExecutor and KubernetesExecutor.
- It chooses the executor to use based on the queue defined on the task.
+ Chooses between LocalExecutor and KubernetesExecutor based on the queue
defined on the task.
+
When the task's queue is the value of ``kubernetes_queue`` in section
``[local_kubernetes_executor]``
of the configuration (default value: `kubernetes`), KubernetesExecutor is
selected to run the task,
otherwise, LocalExecutor is used.