This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bb2a8f89f2 D205 Support - Executors (#32213)
bb2a8f89f2 is described below

commit bb2a8f89f274cf0c5a4f5f8f2ce64094f9a21dab
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 27 12:56:52 2023 -0700

    D205 Support - Executors (#32213)
---
 airflow/executors/base_executor.py              | 13 +++++--------
 airflow/executors/celery_executor.py            |  1 +
 airflow/executors/celery_executor_utils.py      |  1 +
 airflow/executors/celery_kubernetes_executor.py |  1 +
 airflow/executors/kubernetes_executor.py        |  8 +++-----
 airflow/executors/local_executor.py             |  4 ++--
 airflow/executors/local_kubernetes_executor.py  |  4 ++--
 7 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index 9599adabdf..1a44540b07 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -82,10 +82,7 @@ class RunningRetryAttemptType:
         return (pendulum.now("UTC") - self.first_attempt_time).total_seconds()
 
     def can_try_again(self):
-        """
-        If there has been at least one try greater than MIN_SECONDS after 
first attempt,
-        then return False.  Otherwise, return True.
-        """
+        """Return False if there has been at least one try greater than 
MIN_SECONDS, otherwise return True."""
         if self.tries_after_min > 0:
             return False
 
@@ -100,11 +97,9 @@ class RunningRetryAttemptType:
 
 class BaseExecutor(LoggingMixin):
     """
-    Class to derive in order to implement concrete executors.
-    Such as, Celery, Kubernetes, Local, Sequential and the likes.
+    Base class to inherit for concrete executors such as Celery, Kubernetes, 
Local, Sequential, etc.
 
-    :param parallelism: how many jobs should run at one time. Set to
-        ``0`` for infinity.
+    :param parallelism: how many jobs should run at one time. Set to ``0`` for 
infinity.
     """
 
     supports_ad_hoc_ti_run: bool = False
@@ -201,6 +196,7 @@ class BaseExecutor(LoggingMixin):
     def sync(self) -> None:
         """
         Sync will get called periodically by the heartbeat method.
+
         Executors should override this to perform gather statuses.
         """
 
@@ -390,6 +386,7 @@ class BaseExecutor(LoggingMixin):
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
         """
         Handle remnants of tasks that were failed because they were stuck in 
queued.
+
         Tasks can get stuck in queued. If such a task is detected, it will be 
marked
         as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
         if it doesn't.
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index de59804a04..b3a011c815 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -295,6 +295,7 @@ class CeleryExecutor(BaseExecutor):
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
         """
         Handle remnants of tasks that were failed because they were stuck in 
queued.
+
         Tasks can get stuck in queued. If such a task is detected, it will be 
marked
         as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
         if it doesn't.
diff --git a/airflow/executors/celery_executor_utils.py 
b/airflow/executors/celery_executor_utils.py
index 6330b62c6c..49112bba6a 100644
--- a/airflow/executors/celery_executor_utils.py
+++ b/airflow/executors/celery_executor_utils.py
@@ -16,6 +16,7 @@
 # under the License.
 """
 Utilities and classes used by the Celery Executor.
+
 Much of this code is expensive to import/load, be careful where this module is 
imported.
 """
 
diff --git a/airflow/executors/celery_kubernetes_executor.py 
b/airflow/executors/celery_kubernetes_executor.py
index 77dc14753f..d528c43fe5 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
 class CeleryKubernetesExecutor(LoggingMixin):
     """
     CeleryKubernetesExecutor consists of CeleryExecutor and KubernetesExecutor.
+
     It chooses an executor to use based on the queue defined on the task.
     When the queue is the value of ``kubernetes_queue`` in section 
``[celery_kubernetes_executor]``
     of the configuration (default value: `kubernetes`), KubernetesExecutor is 
selected to run the task,
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 8707bf9699..dd1868a192 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -432,12 +432,9 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def sync(self) -> None:
         """
-        The sync function checks the status of all currently running 
kubernetes jobs.
-        If a job is completed, its status is placed in the result queue to
-        be sent back to the scheduler.
-
-        :return:
+        Checks the status of all currently running kubernetes jobs.
 
+        If a job is completed, its status is placed in the result queue to be 
sent back to the scheduler.
         """
         self.log.debug("Syncing KubernetesExecutor")
         self._health_check_kube_watchers()
@@ -880,6 +877,7 @@ class KubernetesExecutor(BaseExecutor):
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
         """
         Handle remnants of tasks that were failed because they were stuck in 
queued.
+
         Tasks can get stuck in queued. If such a task is detected, it will be 
marked
         as `UP_FOR_RETRY` if the task instance has remaining retries or marked 
as `FAILED`
         if it doesn't.
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index 715bdb42ae..ca54a387c8 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -203,8 +203,8 @@ class QueuedLocalWorker(LocalWorkerBase):
 class LocalExecutor(BaseExecutor):
     """
     LocalExecutor executes tasks locally in parallel.
-    It uses the multiprocessing Python library and queues to parallelize the 
execution
-    of tasks.
+
+    It uses the multiprocessing Python library and queues to parallelize the 
execution of tasks.
 
     :param parallelism: how many parallel processes are run in the executor
     """
diff --git a/airflow/executors/local_kubernetes_executor.py 
b/airflow/executors/local_kubernetes_executor.py
index 050132c1a0..cc95232c91 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -33,8 +33,8 @@ if TYPE_CHECKING:
 
 class LocalKubernetesExecutor(LoggingMixin):
     """
-    LocalKubernetesExecutor consists of LocalExecutor and KubernetesExecutor.
-    It chooses the executor to use based on the queue defined on the task.
+    Chooses between LocalExecutor and KubernetesExecutor based on the queue 
defined on the task.
+
     When the task's queue is the value of ``kubernetes_queue`` in section 
``[local_kubernetes_executor]``
     of the configuration (default value: `kubernetes`), KubernetesExecutor is 
selected to run the task,
     otherwise, LocalExecutor is used.

Reply via email to