jedcunningham commented on a change in pull request #15490:
URL: https://github.com/apache/airflow/pull/15490#discussion_r650056775
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -358,15 +358,14 @@ def execute(self, context) -> Optional[str]:
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context,
pod_list.items[0])
- final_state, result = self.handle_pod_overlap(
+ final_state, pod_info, result = self.handle_pod_overlap(
labels, try_numbers_match, launcher, pod_list.items[0]
)
else:
self.log.info("creating pod with labels %s and launcher %s",
labels, launcher)
- final_state, _, result =
self.create_new_pod_for_operator(labels, launcher)
+ final_state, _, pod_info, result =
self.create_new_pod_for_operator(labels, launcher)
Review comment:
```suggestion
final_state, _, pod, result =
self.create_new_pod_for_operator(labels, launcher)
```
I think I like `pod` better than `pod_info`? Only doing this comment once,
but applies everywhere.
##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -129,7 +129,7 @@ def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
raise AirflowException("Pod took too long to start")
time.sleep(1)
- def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State,
Optional[str]]:
+ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod,
Optional[str]]:
"""
Monitors a pod and returns the final state
Review comment:
```suggestion
Monitors a pod and returns the final state, pod, and xcom result
```
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -545,17 +544,17 @@ def monitor_launched_pod(self, launcher, pod) ->
Tuple[State, Optional[str]]:
:return:
"""
try:
- (final_state, result) = launcher.monitor_pod(pod,
get_logs=self.get_logs)
+ (final_state, pod_info, result) = launcher.monitor_pod(pod,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)
if final_state != State.SUCCESS:
if self.log_events_on_failure:
- for event in launcher.read_pod_events(pod).items:
+ for event in pod_info.items:
Review comment:
```suggestion
for event in launcher.read_pod_events(pod).items:
```
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -358,15 +358,14 @@ def execute(self, context) -> Optional[str]:
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context,
pod_list.items[0])
- final_state, result = self.handle_pod_overlap(
+ final_state, pod_info, result = self.handle_pod_overlap(
Review comment:
```suggestion
final_state, _, result = self.handle_pod_overlap(
```
Since we don't use it?
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -518,17 +517,17 @@ def create_new_pod_for_operator(self, labels, launcher)
-> Tuple[State, k8s.V1Po
self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
try:
launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
- final_state, result = launcher.monitor_pod(pod=self.pod,
get_logs=self.get_logs)
+ final_state, pod_info, result = launcher.monitor_pod(pod=self.pod,
get_logs=self.get_logs)
except AirflowException:
if self.log_events_on_failure:
- for event in launcher.read_pod_events(self.pod).items:
+ for event in pod_info.items:
self.log.error("Pod Event: %s - %s", event.reason,
event.message)
raise
finally:
if self.is_delete_operator_pod:
self.log.debug("Deleting pod for task %s", self.task_id)
launcher.delete_pod(self.pod)
- return final_state, self.pod, result
+ return final_state, self.pod, pod_info, result
Review comment:
Returning the pod twice?
```suggestion
return final_state, pod_info, result
```
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -518,17 +517,17 @@ def create_new_pod_for_operator(self, labels, launcher)
-> Tuple[State, k8s.V1Po
self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
try:
launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
- final_state, result = launcher.monitor_pod(pod=self.pod,
get_logs=self.get_logs)
+ final_state, pod_info, result = launcher.monitor_pod(pod=self.pod,
get_logs=self.get_logs)
except AirflowException:
if self.log_events_on_failure:
- for event in launcher.read_pod_events(self.pod).items:
+ for event in pod_info.items:
Review comment:
```suggestion
for event in launcher.read_pod_events(self.pod).items:
```
I think we want to leave this as-is. We wont get a `pod(_info)` back, plus
`read_pod_events` just needs the namespace and name, which we can always use
`self.pod` for.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]