This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e6c0ab252 Replace pod_manager.read_pod_logs with 
client.read_namespaced_pod_log in KubernetesPodOperator._write_logs (#39112)
0e6c0ab252 is described below

commit 0e6c0ab252ee0df900e2e1b7b04283923d1f6ce7
Author: Wei Lee <[email protected]>
AuthorDate: Sun May 5 14:21:11 2024 +0800

    Replace pod_manager.read_pod_logs with client.read_namespaced_pod_log in 
KubernetesPodOperator._write_logs (#39112)
    
    * style(cncf): add type annotation and remove redundant else-block
    
    * refactor(cncf): move deprecated method to the end for better readability
    
    * style(cncf): use f-string instead of string concatenation
    
    * refactor(cncf): reduce one comparison for should_delete_pod
    
    * refactor(cncf): rename write_logs as _write_logs
    
    * refactor(cncf): move read_pod_logs logic to _write_logs
    
    * test(cncf): unify mock.patch as patch
---
 airflow/providers/cncf/kubernetes/operators/pod.py | 70 +++++++++++-----------
 .../cncf/kubernetes/operators/test_pod.py          | 24 ++++----
 2 files changed, 48 insertions(+), 46 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 4a3c7312ed..0cb08033b4 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -553,7 +553,7 @@ class KubernetesPodOperator(BaseOperator):
 
         return pod_request_obj
 
-    def await_pod_start(self, pod: k8s.V1Pod):
+    def await_pod_start(self, pod: k8s.V1Pod) -> None:
         try:
             self.pod_manager.await_pod_start(
                 pod=pod,
@@ -565,23 +565,23 @@ class KubernetesPodOperator(BaseOperator):
                 self._read_pod_events(pod, reraise=False)
             raise
 
-    def extract_xcom(self, pod: k8s.V1Pod):
+    def extract_xcom(self, pod: k8s.V1Pod) -> dict[Any, Any] | None:
         """Retrieve xcom value and kill xcom sidecar container."""
         result = self.pod_manager.extract_xcom(pod)
         if isinstance(result, str) and result.rstrip() == EMPTY_XCOM_RESULT:
             self.log.info("xcom result file is empty.")
             return None
-        else:
-            self.log.info("xcom result: \n%s", result)
-            return json.loads(result)
+
+        self.log.info("xcom result: \n%s", result)
+        return json.loads(result)
 
     def execute(self, context: Context):
         """Based on the deferrable parameter runs the pod asynchronously or 
synchronously."""
-        if self.deferrable:
-            self.execute_async(context)
-        else:
+        if not self.deferrable:
             return self.execute_sync(context)
 
+        self.execute_async(context)
+
     def execute_sync(self, context: Context):
         result = None
         try:
@@ -669,7 +669,7 @@ class KubernetesPodOperator(BaseOperator):
         del self.client
         del self.pod_manager
 
-    def execute_async(self, context: Context):
+    def execute_async(self, context: Context) -> None:
         self.pod_request_obj = self.build_pod_request_obj(context)
         self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
             pod_request_obj=self.pod_request_obj,
@@ -687,7 +687,7 @@ class KubernetesPodOperator(BaseOperator):
 
         self.invoke_defer_method()
 
-    def invoke_defer_method(self, last_log_time: DateTime | None = None):
+    def invoke_defer_method(self, last_log_time: DateTime | None = None) -> 
None:
         """Redefine triggers which are being used in child classes."""
         trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
         self.defer(
@@ -742,7 +742,7 @@ class KubernetesPodOperator(BaseOperator):
             if event["status"] in ("error", "failed", "timeout"):
                 # fetch some logs when pod is failed
                 if self.get_logs:
-                    self.write_logs(self.pod, follow=follow, 
since_time=last_log_time)
+                    self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
 
                 if self.do_xcom_push:
                     _ = self.extract_xcom(pod=self.pod)
@@ -770,7 +770,7 @@ class KubernetesPodOperator(BaseOperator):
             elif event["status"] == "success":
                 # fetch some logs when pod is executed successfully
                 if self.get_logs:
-                    self.write_logs(self.pod, follow=follow, 
since_time=last_log_time)
+                    self._write_logs(self.pod, follow=follow, 
since_time=last_log_time)
 
                 if self.do_xcom_push:
                     xcom_sidecar_output = self.extract_xcom(pod=self.pod)
@@ -781,7 +781,7 @@ class KubernetesPodOperator(BaseOperator):
         finally:
             self._clean(event)
 
-    def _clean(self, event: dict[str, Any]):
+    def _clean(self, event: dict[str, Any]) -> None:
         if event["status"] == "running":
             return
         istio_enabled = self.is_istio_enabled(self.pod)
@@ -797,27 +797,27 @@ class KubernetesPodOperator(BaseOperator):
                 remote_pod=self.pod,
             )
 
-    @deprecated(reason="use `trigger_reentry` instead.", 
category=AirflowProviderDeprecationWarning)
-    def execute_complete(self, context: Context, event: dict, **kwargs):
-        return self.trigger_reentry(context=context, event=event)
-
-    def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: 
DateTime | None = None):
+    def _write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: 
DateTime | None = None) -> None:
         try:
             since_seconds = (
                 math.ceil((datetime.datetime.now(tz=datetime.timezone.utc) - 
since_time).total_seconds())
                 if since_time
                 else None
             )
-            logs = self.pod_manager.read_pod_logs(
+            logs = self.client.read_namespaced_pod_log(
+                name=pod.metadata.name,
+                namespace=pod.metadata.namespace,
                 pod=pod,
                 container_name=self.base_container_name,
                 follow=follow,
+                timestamps=False,
                 since_seconds=since_seconds,
+                _preload_content=False,
             )
             for raw_line in logs:
                 line = raw_line.decode("utf-8", 
errors="backslashreplace").rstrip("\n")
                 if line:
-                    self.log.info("Container logs: %s", line)
+                    self.log.info("[%s] logs: %s", self.base_container_name, 
line)
         except HTTPError as e:
             self.log.warning(
                 "Reading of logs interrupted with error %r; will retry. "
@@ -825,7 +825,7 @@ class KubernetesPodOperator(BaseOperator):
                 e,
             )
 
-    def post_complete_action(self, *, pod, remote_pod, **kwargs):
+    def post_complete_action(self, *, pod, remote_pod, **kwargs) -> None:
         """Actions that must be done after operator finishes logic of the 
deferrable_execution."""
         self.cleanup(
             pod=pod,
@@ -893,7 +893,7 @@ class KubernetesPodOperator(BaseOperator):
                 )
             )
 
-    def _read_pod_events(self, pod, *, reraise=True):
+    def _read_pod_events(self, pod, *, reraise=True) -> None:
         """Will fetch and emit events from pod."""
         with _optionally_suppress(reraise=reraise):
             for event in self.pod_manager.read_pod_events(pod).items:
@@ -941,15 +941,11 @@ class KubernetesPodOperator(BaseOperator):
     def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
         with _optionally_suppress(reraise=reraise):
             if pod is not None:
-                should_delete_pod = (
-                    (self.on_finish_action == OnFinishAction.DELETE_POD)
-                    or (
-                        self.on_finish_action == 
OnFinishAction.DELETE_SUCCEEDED_POD
-                        and pod.status.phase == PodPhase.SUCCEEDED
-                    )
-                    or (
-                        self.on_finish_action == 
OnFinishAction.DELETE_SUCCEEDED_POD
-                        and container_is_succeeded(pod, 
self.base_container_name)
+                should_delete_pod = (self.on_finish_action == 
OnFinishAction.DELETE_POD) or (
+                    self.on_finish_action == 
OnFinishAction.DELETE_SUCCEEDED_POD
+                    and (
+                        pod.status.phase == PodPhase.SUCCEEDED
+                        or container_is_succeeded(pod, 
self.base_container_name)
                     )
                 )
                 if should_delete_pod:
@@ -966,8 +962,8 @@ class KubernetesPodOperator(BaseOperator):
         label_strings = [f"{label_id}={label}" for label_id, label in 
sorted(labels.items())]
         labels_value = ",".join(label_strings)
         if exclude_checked:
-            labels_value += f",{self.POD_CHECKED_KEY}!=True"
-        labels_value += ",!airflow-worker"
+            labels_value = f"{labels_value},{self.POD_CHECKED_KEY}!=True"
+        labels_value = f"{labels_value},!airflow-worker"
         return labels_value
 
     @staticmethod
@@ -1129,6 +1125,10 @@ class KubernetesPodOperator(BaseOperator):
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode="strict")))
 
+    @deprecated(reason="use `trigger_reentry` instead.", 
category=AirflowProviderDeprecationWarning)
+    def execute_complete(self, context: Context, event: dict, **kwargs):
+        return self.trigger_reentry(context=context, event=event)
+
 
 class _optionally_suppress(AbstractContextManager):
     """
@@ -1142,7 +1142,7 @@ class _optionally_suppress(AbstractContextManager):
     :meta private:
     """
 
-    def __init__(self, *exceptions, reraise=False):
+    def __init__(self, *exceptions, reraise: bool = False) -> None:
         self._exceptions = exceptions or (Exception,)
         self.reraise = reraise
         self.exception = None
@@ -1150,7 +1150,7 @@ class _optionally_suppress(AbstractContextManager):
     def __enter__(self):
         return self
 
-    def __exit__(self, exctype, excinst, exctb):
+    def __exit__(self, exctype, excinst, exctb) -> bool:
         error = exctype is not None
         matching_error = error and issubclass(exctype, self._exceptions)
         if (error and not matching_error) or (matching_error and self.reraise):
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 4a08e736f6..fe7c941460 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1999,7 +1999,7 @@ class TestKubernetesPodOperatorAsync:
 
     @pytest.mark.parametrize("get_logs", [True, False])
     @patch(KUB_OP_PATH.format("post_complete_action"))
-    @patch(KUB_OP_PATH.format("write_logs"))
+    @patch(KUB_OP_PATH.format("_write_logs"))
     @patch(POD_MANAGER_CLASS)
     @patch(HOOK_CLASS)
     def test_async_get_logs_should_execute_successfully(
@@ -2075,9 +2075,9 @@ class TestKubernetesPodOperatorAsync:
         with pytest.raises(AirflowException, match=expect_match):
             k.cleanup(pod, pod)
 
-    @mock.patch(f"{HOOK_CLASS}.get_pod")
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
+    @patch(f"{HOOK_CLASS}.get_pod")
+    
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
+    
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
     def test_get_logs_running(
         self,
         fetch_container_logs,
@@ -2097,10 +2097,11 @@ class TestKubernetesPodOperatorAsync:
             )
         fetch_container_logs.is_called_with(pod, "base")
 
-    
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
-    
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
-    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
-    def test_get_logs_not_running(self, fetch_container_logs, find_pod, 
cleanup):
+    @patch(KUB_OP_PATH.format("_write_logs"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
+    
@patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs")
+    def test_get_logs_not_running(self, fetch_container_logs, find_pod, 
cleanup, mock_write_log):
         pod = MagicMock()
         find_pod.return_value = pod
         op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)
@@ -2110,9 +2111,10 @@ class TestKubernetesPodOperatorAsync:
         )
         fetch_container_logs.is_called_with(pod, "base")
 
-    
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
-    
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
-    def test_trigger_error(self, find_pod, cleanup):
+    @patch(KUB_OP_PATH.format("_write_logs"))
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup")
+    
@patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod")
+    def test_trigger_error(self, find_pod, cleanup, mock_write_log):
         """Assert that trigger_reentry raise exception in case of error"""
         find_pod.return_value = MagicMock()
         op = KubernetesPodOperator(task_id="test_task", name="test-pod", 
get_logs=True)

Reply via email to