uranusjr commented on code in PR #27969:
URL: https://github.com/apache/airflow/pull/27969#discussion_r1040567143


##########
airflow/executors/celery_executor.py:
##########
@@ -540,6 +541,7 @@ def _set_celery_pending_task_timeout(
         self, key: TaskInstanceKey, timeout_type: 
_CeleryPendingTaskTimeoutType | None
     ) -> None:
         """
+        Set pending task timeout.

Review Comment:
   ```suggestion
           Set pending task timeout.
   
   ```



##########
airflow/executors/celery_kubernetes_executor.py:
##########
@@ -52,40 +52,44 @@ def __init__(self, celery_executor: CeleryExecutor, 
kubernetes_executor: Kuberne
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from celery and kubernetes executor"""
+        """Return queued tasks from celery and kubernetes executor."""
         queued_tasks = self.celery_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from celery and kubernetes executor"""
+        """Return running tasks from celery and kubernetes executor."""
         return 
self.celery_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> int | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really 
an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.

Review Comment:
   ```suggestion
           Inherited attribute from BaseExecutor.
   
   ```



##########
airflow/executors/celery_kubernetes_executor.py:
##########
@@ -52,40 +52,44 @@ def __init__(self, celery_executor: CeleryExecutor, 
kubernetes_executor: Kuberne
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from celery and kubernetes executor"""
+        """Return queued tasks from celery and kubernetes executor."""
         queued_tasks = self.celery_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from celery and kubernetes executor"""
+        """Return running tasks from celery and kubernetes executor."""
         return 
self.celery_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> int | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really 
an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.
         """
         return self._job_id
 
     @job_id.setter
     def job_id(self, value: int | None) -> None:
-        """job_id is manipulated by SchedulerJob.  We must propagate the 
job_id to wrapped executors."""
+        """
+        job_id is manipulated by SchedulerJob.
+        We must propagate the job_id to wrapped executors.
+        """

Review Comment:
   ```suggestion
           """Expose job ID for SchedulerJob."""
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -50,8 +50,8 @@
 
 class LocalWorkerBase(Process, LoggingMixin):
     """
-    LocalWorkerBase implementation to run airflow commands. Executes the given
-    command and puts the result into a result queue when done, terminating 
execution.
+    LocalWorkerBase implementation to run airflow commands.
+    Executes the given command and puts the result into a result queue when 
done, terminating execution.

Review Comment:
   ```suggestion
       LocalWorkerBase implementation to run airflow commands.
   
       Executes the given command and puts the result into a result queue when 
done, terminating execution.
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -263,18 +262,14 @@ def sync(self) -> None:
                 self.executor.workers_active -= 1
 
         def end(self) -> None:
-            """
-            This method is called when the caller is done submitting job and
-            wants to wait synchronously for the job submitted previously to be
-            all done.
-            """
+            """Wait synchronously for the previously submitted job to 
complete."""
             while self.executor.workers_active > 0:
                 self.executor.sync()
 
     class LimitedParallelism:
         """
-        Implements LocalExecutor with limited parallelism using a task queue to
-        coordinate work distribution.
+        Implements LocalExecutor with limited parallelism.
+        Uses a task queue to coordinate work distribution.

Review Comment:
   ```suggestion
           Implements LocalExecutor with limited parallelism.
   
           Uses a task queue to coordinate work distribution.
   ```



##########
airflow/executors/local_kubernetes_executor.py:
##########
@@ -52,41 +52,45 @@ def __init__(self, local_executor: LocalExecutor, 
kubernetes_executor: Kubernete
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from local and kubernetes executor"""
+        """Return queued tasks from local and kubernetes executor."""
         queued_tasks = self.local_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from local and kubernetes executor"""
+        """Return running tasks from local and kubernetes executor."""
         return 
self.local_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> str | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really 
an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.

Review Comment:
   ```suggestion
           Inherited attribute from BaseExecutor.
   
           Since this is not really an executor, but a wrapper of executors
           we implemented it as property, so we can have custom setter.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -67,13 +67,13 @@ class BackfillJob(BaseJob):
     @attr.define
     class _DagRunTaskStatus:
         """
-        Internal status of the backfill job. This class is intended to be 
instantiated
-        only within a BackfillJob instance and will track the execution of 
tasks,
-        e.g. running, skipped, succeeded, failed, etc. Information about the 
dag runs
-        related to the backfill job are also being tracked in this structure,
-        .e.g finished runs, etc. Any other status related information related 
to the
-        execution of dag runs / tasks can be included in this structure since 
it makes
-        it easier to pass it around.
+        Internal status of the backfill job.

Review Comment:
   ```suggestion
           Internal status of the backfill job.
   
   ```



##########
airflow/executors/local_executor.py:
##########
@@ -162,8 +162,8 @@ def do_work(self) -> None:
 
 class QueuedLocalWorker(LocalWorkerBase):
     """
-    LocalWorker implementation that is waiting for tasks from a queue and will
-    continue executing commands as they become available in the queue.
+    LocalWorker implementation that is waiting for tasks from a queue.
+    Will continue executing commands as they become available in the queue.

Review Comment:
   ```suggestion
       LocalWorker implementation that is waiting for tasks from a queue.
   
       Will continue executing commands as they become available in the queue.
   ```



##########
airflow/executors/sequential_executor.py:
##########
@@ -34,9 +34,10 @@
 
 class SequentialExecutor(BaseExecutor):
     """
-    This executor will only run one task instance at a time, can be used
-    for debugging. It is also the only executor that can be used with sqlite
-    since sqlite doesn't support multiple connections.
+    This executor will only run one task instance at a time.
+    It can be used for debugging. It is also the only executor
+    that can be used with sqlite since sqlite doesn't support
+    multiple connections.

Review Comment:
   ```suggestion
       This executor will only run one task instance at a time.
   
       It can be used for debugging. It is also the only executor
       that can be used with sqlite since sqlite doesn't support
       multiple connections.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -162,8 +163,8 @@ def __init__(
 
     def _update_counters(self, ti_status, session=None):
         """
-        Updates the counters per state of the tasks that were running. Can 
re-add
-        to tasks to run in case required.
+        Updates the counters per state of the tasks that were running.

Review Comment:
   ```suggestion
           Updates the counters per state of the tasks that were running.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -122,6 +122,7 @@ def __init__(
         **kwargs,
     ):
         """
+        Create a BackfillJob.

Review Comment:
   ```suggestion
           Create a BackfillJob.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -413,9 +411,9 @@ def _process_backfill_task_instances(
         session=None,
     ) -> list:
         """
-        Process a set of task instances from a set of dag runs. Special 
handling is done
-        to account for different task instance states that could be present 
when running
-        them in a backfill process.
+        Process a set of task instances from a set of dag runs.
+        Special handling is done to account for different task instance states
+        that could be present when running them in a backfill process.

Review Comment:
   ```suggestion
           Process a set of task instances from a set of DAG runs.
   
           Special handling is done to account for different task instance 
states
           that could be present when running them in a backfill process.
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -291,9 +291,8 @@ def _iter_task_needing_expansion() -> 
Iterator[AbstractOperator]:
     @provide_session
     def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session 
= None):
         """
-        Returns a dag run for the given run date, which will be matched to an 
existing
-        dag run if available or create a new dag run otherwise. If the 
max_active_runs
-        limit is reached, this function will return None.
+        Return an existing dag run for the given run date or create one.

Review Comment:
   ```suggestion
           Return an existing dag run for the given run date or create one.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -901,6 +895,7 @@ def _execute(self, session=None):
     @provide_session
     def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, 
session=None) -> int | None:
         """
+        Reset state of orphaned tasks.

Review Comment:
   ```suggestion
           Reset state of orphaned tasks.
   
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -85,17 +85,18 @@
 
 def _is_parent_process() -> bool:
     """
-    Returns True if the current process is the parent process. False if the 
current process is a child
-    process started by multiprocessing.
+    Whether this is a parent process.
+    Return True if the current process is the parent process.
+    False if the current process is a child process started by multiprocessing.
     """
     return multiprocessing.current_process().name == "MainProcess"
 
 
 class SchedulerJob(BaseJob):
     """
-    This SchedulerJob runs for a specific time interval and schedules the jobs
-    that are ready to run. It figures out the latest runs for each
-    task and sees if the dependencies for the next schedules are met.
+    SchedulerJob runs for a specific time interval and schedules jobs that are 
ready to run.
+    It figures out the latest runs for each task and sees if the dependencies
+    for the next schedules are met.

Review Comment:
   ```suggestion
       SchedulerJob runs for a specific time interval and schedules jobs that 
are ready to run.
   
       It figures out the latest runs for each task and sees if the dependencies
       for the next schedules are met.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -241,8 +242,12 @@ def __get_concurrency_maps(
 
     def _executable_task_instances_to_queued(self, max_tis: int, session: 
Session) -> list[TI]:
         """
-        Finds TIs that are ready for execution with respect to pool limits,
-        dag max_active_tasks, executor state, and priority.
+        Find TIs that are ready for execution based on conditions.
+        Conditions include:
+        - pool limits
+        - dag max_active_tasks
+        - executor state
+        - priority

Review Comment:
   ```suggestion
           Find TIs that are ready for execution based on conditions.
   
           Conditions include:
           - pool limits
           - DAG max_active_tasks
           - executor state
           - priority
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -85,17 +85,18 @@
 
 def _is_parent_process() -> bool:
     """
-    Returns True if the current process is the parent process. False if the 
current process is a child
-    process started by multiprocessing.
+    Whether this is a parent process.
+    Return True if the current process is the parent process.
+    False if the current process is a child process started by multiprocessing.

Review Comment:
   ```suggestion
       Whether this is a parent process.
   
       Return True if the current process is the parent process.
       False if the current process is a child process started by 
multiprocessing.
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -816,7 +820,8 @@ def _update_dag_run_state_for_paused_dags(self, session: 
Session = NEW_SESSION)
 
     def _run_scheduler_loop(self) -> None:
         """
-        The actual scheduler loop. The main steps in the loop are:
+        The actual scheduler loop.
+        The main steps in the loop are:

Review Comment:
   ```suggestion
           The actual scheduler loop.
   
           The main steps in the loop are:
   ```



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -447,6 +448,7 @@ def __init__(self):
     @provide_session
     def clear_not_launched_queued_tasks(self, session=None) -> None:
         """
+        Clear tasks that were not yet launched, but were previously queued.

Review Comment:
   ```suggestion
           Clear tasks that were not yet launched, but were previously queued.
   
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -55,8 +55,8 @@
 
 class BackfillJob(BaseJob):
     """
-    A backfill job consists of a dag or subdag for a specific time range. It
-    triggers a set of task instance runs, in the right order and lasts for
+    A backfill job consists of a dag or subdag for a specific time range.
+    It triggers a set of task instance runs, in the right order and lasts for

Review Comment:
   ```suggestion
       A backfill job consists of a dag or subdag for a specific time range.
   
       It triggers a set of task instance runs, in the right order and lasts for
   ```



##########
airflow/executors/local_kubernetes_executor.py:
##########
@@ -52,41 +52,45 @@ def __init__(self, local_executor: LocalExecutor, 
kubernetes_executor: Kubernete
 
     @property
     def queued_tasks(self) -> dict[TaskInstanceKey, QueuedTaskInstanceType]:
-        """Return queued tasks from local and kubernetes executor"""
+        """Return queued tasks from local and kubernetes executor."""
         queued_tasks = self.local_executor.queued_tasks.copy()
         queued_tasks.update(self.kubernetes_executor.queued_tasks)
 
         return queued_tasks
 
     @property
     def running(self) -> set[TaskInstanceKey]:
-        """Return running tasks from local and kubernetes executor"""
+        """Return running tasks from local and kubernetes executor."""
         return 
self.local_executor.running.union(self.kubernetes_executor.running)
 
     @property
     def job_id(self) -> str | None:
         """
-        This is a class attribute in BaseExecutor but since this is not really 
an executor, but a wrapper
-        of executors we implement as property so we can have custom setter.
+        Inherited attribute from BaseExecutor.
+        Since this is not really an executor, but a wrapper of executors
+        we implemented it as property, so we can have custom setter.
         """
         return self._job_id
 
     @job_id.setter
     def job_id(self, value: str | None) -> None:
-        """job_id is manipulated by SchedulerJob.  We must propagate the 
job_id to wrapped executors."""
+        """
+        job_id is manipulated by SchedulerJob.
+        We must propagate the job_id to wrapped executors.
+        """

Review Comment:
   ```suggestion
           """Expose job ID for SchedulerJob."""
   ```



##########
airflow/jobs/backfill_job.py:
##########
@@ -733,8 +731,7 @@ def _get_dag_with_subdags(self) -> list[DAG]:
     @provide_session
     def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, 
start_date, session=None):
         """
-        Computes the dag runs and their respective task instances for
-        the given run dates and executes the task instances.
+        Compute and execute dag runs and their respective task instances for 
the given dates.

Review Comment:
   ```suggestion
           Compute and execute dag runs and their respective task instances for 
the given dates.
   
   ```



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -413,6 +413,7 @@ def terminate(self) -> None:
 
 def get_base_pod_from_template(pod_template_file: str | None, kube_config: 
Any) -> k8s.V1Pod:
     """
+    Get base pod from template.

Review Comment:
   ```suggestion
       Get base pod from template.
   
   ```



##########
airflow/jobs/base_job.py:
##########
@@ -49,10 +49,10 @@ def _resolve_dagrun_model():
 
 class BaseJob(Base, LoggingMixin):
     """
-    Abstract class to be derived for jobs. Jobs are processing items with state
-    and duration that aren't task instances. For instance a BackfillJob is
-    a collection of task instance runs, but should have its own state, start
-    and end time.
+    Abstract class to be derived for jobs.
+    Jobs are processing items with state and duration that aren't task 
instances.
+    For instance a BackfillJob is a collection of task instance runs,
+    but should have its own state, start and end time.

Review Comment:
   ```suggestion
       Abstract class to be derived for jobs.
   
       Jobs are processing items with state and duration that aren't task 
instances.
       For instance a BackfillJob is a collection of task instance runs,
       but should have its own state, start and end time.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to