jedcunningham commented on a change in pull request #15490:
URL: https://github.com/apache/airflow/pull/15490#discussion_r650056775



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -358,15 +358,14 @@ def execute(self, context) -> Optional[str]:
 
             if len(pod_list.items) == 1:
                 try_numbers_match = self._try_numbers_match(context, 
pod_list.items[0])
-                final_state, result = self.handle_pod_overlap(
+                final_state, pod_info, result = self.handle_pod_overlap(
                     labels, try_numbers_match, launcher, pod_list.items[0]
                 )
             else:
                 self.log.info("creating pod with labels %s and launcher %s", 
labels, launcher)
-                final_state, _, result = 
self.create_new_pod_for_operator(labels, launcher)
+                final_state, _, pod_info, result = 
self.create_new_pod_for_operator(labels, launcher)

Review comment:
       ```suggestion
                   final_state, _, pod, result = 
self.create_new_pod_for_operator(labels, launcher)
   ```
   
   I think I like `pod` better than `pod_info`? Only doing this comment once, 
but applies everywhere.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -129,7 +129,7 @@ def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
                     raise AirflowException("Pod took too long to start")
                 time.sleep(1)
 
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, 
Optional[str]]:
+    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
         """
         Monitors a pod and returns the final state

Review comment:
       ```suggestion
           Monitors a pod and returns the final state, pod, and xcom result
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -545,17 +544,17 @@ def monitor_launched_pod(self, launcher, pod) -> 
Tuple[State, Optional[str]]:
         :return:
         """
         try:
-            (final_state, result) = launcher.monitor_pod(pod, 
get_logs=self.get_logs)
+            (final_state, pod_info, result) = launcher.monitor_pod(pod, 
get_logs=self.get_logs)
         finally:
             if self.is_delete_operator_pod:
                 launcher.delete_pod(pod)
         if final_state != State.SUCCESS:
             if self.log_events_on_failure:
-                for event in launcher.read_pod_events(pod).items:
+                for event in pod_info.items:

Review comment:
       ```suggestion
                   for event in launcher.read_pod_events(pod).items:
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -358,15 +358,14 @@ def execute(self, context) -> Optional[str]:
 
             if len(pod_list.items) == 1:
                 try_numbers_match = self._try_numbers_match(context, 
pod_list.items[0])
-                final_state, result = self.handle_pod_overlap(
+                final_state, pod_info, result = self.handle_pod_overlap(

Review comment:
       ```suggestion
                   final_state, _, result = self.handle_pod_overlap(
   ```
   Since we don't use it?

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -518,17 +517,17 @@ def create_new_pod_for_operator(self, labels, launcher) 
-> Tuple[State, k8s.V1Po
         self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
         try:
             launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
-            final_state, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
+            final_state, pod_info, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
         except AirflowException:
             if self.log_events_on_failure:
-                for event in launcher.read_pod_events(self.pod).items:
+                for event in pod_info.items:
                     self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
             raise
         finally:
             if self.is_delete_operator_pod:
                 self.log.debug("Deleting pod for task %s", self.task_id)
                 launcher.delete_pod(self.pod)
-        return final_state, self.pod, result
+        return final_state, self.pod, pod_info, result

Review comment:
       Returning the pod twice?
   ```suggestion
           return final_state, pod_info, result
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -518,17 +517,17 @@ def create_new_pod_for_operator(self, labels, launcher) 
-> Tuple[State, k8s.V1Po
         self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
         try:
             launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
-            final_state, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
+            final_state, pod_info, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
         except AirflowException:
             if self.log_events_on_failure:
-                for event in launcher.read_pod_events(self.pod).items:
+                for event in pod_info.items:

Review comment:
       ```suggestion
                   for event in launcher.read_pod_events(self.pod).items:
   ```
   
   I think we want to leave this as-is. We wont get a `pod(_info)` back, plus 
`read_pod_events` just needs the namespace and name, which we can always use 
`self.pod` for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to