dstandish commented on a change in pull request #20381:
URL: https://github.com/apache/airflow/pull/20381#discussion_r771851298
##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -321,19 +322,24 @@ def _exec_pod_command(self, resp, command: str) -> None:
break
return None
- def process_status(self, job_id: str, status: str) -> str:
- """Process status information for the JOB"""
- status = status.lower()
- if status == PodStatus.PENDING:
+ def process_status(self, pod_name: str, pod_phase: str) ->
TaskInstanceState:
+ """
+ Convert a K8S Pod phase to Airflow task state.
+ :param pod_name: Name of the pod
+ :param pod_phase: Phase of the pod
+ :return: Airflow task state corresponding to the pod phase
+ """
+ pod_phase = pod_phase.lower()
+ if pod_phase == PodStatus.PENDING:
return State.QUEUED
Review comment:
> I looked at this too and was confused on the direction.
I know mypy expects the return value to be TaskInstanceState, but
State.QUEUED is a member of State class with the field assigned to enum in
TaskInstanceState, @potiuk had suggested to use State(State.QUEUED).
This is one of the things I address in
https://github.com/apache/airflow/pull/19572.
Currently with KPO, we sort of mix up pod phase and task state. We convert
pod phase to task state, than look at this converted state to _infer_
conclusions about pod phase. In #19572 I don't convert to state any more and
it's much simpler this way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]