uranusjr commented on code in PR #28871:
URL: https://github.com/apache/airflow/pull/28871#discussion_r1067709600
##########
airflow/executors/kubernetes_executor.py:
##########
@@ -751,19 +751,25 @@ def _change_state(self, key: TaskInstanceKey, state: str
| None, pod_id: str, na
if TYPE_CHECKING:
assert self.kube_scheduler
- if state != State.RUNNING:
- if self.kube_config.delete_worker_pods:
- if state != State.FAILED or
self.kube_config.delete_worker_pods_on_failure:
- self.kube_scheduler.delete_pod(pod_id, namespace)
- self.log.info("Deleted pod: %s in namespace %s", str(key),
str(namespace))
- else:
- self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id,
namespace=namespace)
- self.log.info("Patched pod %s in namespace %s to mark it as
done", str(key), str(namespace))
- try:
- self.running.remove(key)
- except KeyError:
- self.log.debug("Could not find key: %s", str(key))
- self.event_buffer[key] = state, None
+ if state == State.RUNNING:
+ self.event_buffer[key] = state, None
+ return
+
+ if self.kube_config.delete_worker_pods:
+ if state != State.FAILED or
self.kube_config.delete_worker_pods_on_failure:
+ self.kube_scheduler.delete_pod(pod_id, namespace)
+ self.log.info("Deleted pod: %s in namespace %s", str(key),
str(namespace))
+ else:
+ self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id,
namespace=namespace)
+ self.log.info("Patched pod %s in namespace %s to mark it as done",
str(key), str(namespace))
+
+ if key in self.running:
+ self.running.remove(key)
+ # We do get multiple events once the pod hits a terminal state,
and we only want to
+ # do this once, so only do it when we remove the task from running
+ self.event_buffer[key] = state, None
+ else:
+ self.log.debug("TI key not in running, not adding to event_buffer:
%s", str(key))
Review Comment:
```suggestion
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running, not adding to
event_buffer: %s", str(key))
else:
# We get multiple events once the pod hits a terminal state, and
we only want to
# do this once, so only do it when we remove the task from
running
self.event_buffer[key] = state, None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]