hussein-awala commented on code in PR #36240:
URL: https://github.com/apache/airflow/pull/36240#discussion_r1428952107


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -471,6 +471,15 @@ def _change_state(
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == TaskInstanceState.ADOPTED:
+            # When the task pod is adopted by another scheduler,
+            # then remove the task from the current scheduler running queue.

Review Comment:
   Better to use `executor` instead of `scheduler`:
   ```suggestion
               # When the task pod is adopted by another executor,
               # then remove the task from the current executor running queue.
   ```



##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -220,7 +220,15 @@ def process_status(
         pod = event["object"]
         annotations_string = annotations_for_logging_task_metadata(annotations)
         """Process status response."""
-        if status == "Pending":
+        if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:

Review Comment:
   Nice one!



##########
airflow/utils/state.py:
##########
@@ -46,6 +46,9 @@ class TaskInstanceState(str, Enum):
     REMOVED = "removed"  # Task vanished from DAG before it ran
     SCHEDULED = "scheduled"  # Task should run and will be handed to executor 
soon
 
+    # Set by executor
+    ADOPTED = "adopted"
+

Review Comment:
   KubernetesExecutor is a part of the `cncf.kuberntes` provider, which could 
be used with the old version of Airlfow (the current min airflow version for 
providers is 2.6.0), which means that this state could be unavailable for the 
users who use the latest version of k8s provider with an old version of Airflow.
   
   Also, the TI will never have this state in the metadata, so maybe you can 
find another better way to tell the executor to free a slot.



##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##########
@@ -220,7 +220,15 @@ def process_status(
         pod = event["object"]
         annotations_string = annotations_for_logging_task_metadata(annotations)
         """Process status response."""
-        if status == "Pending":
+        if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
+            # This will happen only when the task pods are adopted by another 
scheduler.

Review Comment:
   ```suggestion
               # This will happen only when the task pods are adopted by 
another executor.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to