Nataneljpwd commented on code in PR #58931:
URL: https://github.com/apache/airflow/pull/58931#discussion_r2658935734


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -907,27 +907,39 @@ def extract_xcom_kill(self, pod: V1Pod):
                 _preload_content=False,
             )
         ) as resp:
-            self._exec_pod_command(resp, "kill -2 $(pgrep -u $(id -u) -f 
'sh')")
-
-    def _exec_pod_command(self, resp, command: str) -> str | None:
+            xcom_kill_command = "kill -2 $(pgrep -u $(id -u) -f 'sh')"
+            # fallback command for containers that don't support pgrep -u
+            fallback_xcom_kill_command = (
+                "for f in /proc/[0-9]*/comm; do "
+                "[ -O $f ] && read c < $f && [ \"$c\" = \"sh\" ] && 
pid=${f%/comm} && kill -2 ${pid##*/}; "
+                "done"
+            )
+            result, error = self._exec_pod_command(resp, xcom_kill_command)
+            
+            if error:
+                self.log.info("Primary kill command failed, trying fallback 
command")

Review Comment:
   More of a warning rather than an info in my opinion 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -907,27 +907,39 @@ def extract_xcom_kill(self, pod: V1Pod):
                 _preload_content=False,
             )
         ) as resp:
-            self._exec_pod_command(resp, "kill -2 $(pgrep -u $(id -u) -f 
'sh')")
-
-    def _exec_pod_command(self, resp, command: str) -> str | None:
+            xcom_kill_command = "kill -2 $(pgrep -u $(id -u) -f 'sh')"
+            # fallback command for containers that don't support pgrep -u
+            fallback_xcom_kill_command = (
+                "for f in /proc/[0-9]*/comm; do "
+                "[ -O $f ] && read c < $f && [ \"$c\" = \"sh\" ] && 
pid=${f%/comm} && kill -2 ${pid##*/}; "
+                "done"
+            )
+            result, error = self._exec_pod_command(resp, xcom_kill_command)
+            
+            if error:
+                self.log.info("Primary kill command failed, trying fallback 
command")
+                self._exec_pod_command(resp, fallback_xcom_kill_command)
+
+    def _exec_pod_command(self, resp, command: str) -> tuple[str | None, str | 
None]:

Review Comment:
   Why add another return param instead of throwing an exception on error?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -907,27 +907,39 @@ def extract_xcom_kill(self, pod: V1Pod):
                 _preload_content=False,
             )
         ) as resp:
-            self._exec_pod_command(resp, "kill -2 $(pgrep -u $(id -u) -f 
'sh')")
-
-    def _exec_pod_command(self, resp, command: str) -> str | None:
+            xcom_kill_command = "kill -2 $(pgrep -u $(id -u) -f 'sh')"
+            # fallback command for containers that don't support pgrep -u
+            fallback_xcom_kill_command = (
+                "for f in /proc/[0-9]*/comm; do "
+                "[ -O $f ] && read c < $f && [ \"$c\" = \"sh\" ] && 
pid=${f%/comm} && kill -2 ${pid##*/}; "
+                "done"
+            )
+            result, error = self._exec_pod_command(resp, xcom_kill_command)
+            
+            if error:
+                self.log.info("Primary kill command failed, trying fallback 
command")
+                self._exec_pod_command(resp, fallback_xcom_kill_command)
+
+    def _exec_pod_command(self, resp, command: str) -> tuple[str | None, str | 
None]:
+        """Execute a command in the pod and return the result and error 
(result, error)."""
         res = ""
+        error_res = ""
         if not resp.is_open():
-            return None
+            return None, None
         self.log.info("Running command... %s", command)
         resp.write_stdin(f"{command}\n")
         while resp.is_open():
             resp.update(timeout=1)
             while resp.peek_stdout():
                 res += resp.read_stdout()
-            error_res = ""
             while resp.peek_stderr():
                 error_res += resp.read_stderr()
             if error_res:
                 self.log.info("stderr from command: %s", error_res)
                 break
             if res:
-                return res
-        return None
+                return res, error_res or None

Review Comment:
   You might not need to convert to None here as "" will evaluate to false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to