dstandish commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r766915248



##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self, 
mock_patch_already_checked, mo
             task_id="task",
             is_delete_operator_pod=False,
         )
-        # Run it first to easily get the pod
-        pod = self.run_pod(k)
-
-        # Now try and "reattach"
-        mock_patch_already_checked.reset_mock()
-        mock_delete_pod.reset_mock()
-        self.client_mock.return_value.list_namespaced_pod.return_value.items = 
[pod]
-        self.monitor_mock.return_value = (State.FAILED, None, None)
+        remote_pod_mock = MagicMock()
+        remote_pod_mock.status.phase = 'Failed'
+        self.await_pod_mock.return_value = remote_pod_mock
 
-        context = self.create_context(k)
+        context = create_context(k)
         with pytest.raises(AirflowException):
             k.execute(context=context)
         mock_patch_already_checked.assert_called_once()
         mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+    with mock.patch('logging.Logger.error') as mock_error:
+
+        class A:
+            log = logging.getLogger()
+
+            def fail(self):
+                with _suppress_with_logging(self, ValueError):
+                    raise ValueError("failure")
+
+        a = A()
+        a.fail()
+        mock_error.assert_called_once_with("failure", exc_info=True)

Review comment:
       OK so yeah I knew this would raise an eyebrow
   
   There are a variety of "cleanup" tasks that we want to do at execution end, 
whether the run is successful or not:
   * delete operator
   * harvest logs on failure
   * patch already checked
   
   Add to that push xcom, which as you know there's a request to move that in 
the finally block.
   
   Any one of these distinct cleanup tasks could fail for a variety of reasons. 
 But I want to attempt each of them, even if th others fail.
   
   So I thought it best to suppress the exceptions when making each call.
   
   But with the built-in `suppress` context mgr, if it _does_ fail, the 
exceptioned will be swallowed with no logging.  So what this one does is 
suppress, but also log that there was a failure.
   
   We could achieve the same with multiple try/excepts in the finally, but this 
is cleaner.
   
   Admittedly it's a bit odd...
   
   WDYT?

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -163,8 +175,14 @@ class KubernetesPodOperator(BaseOperator):
     :param termination_grace_period: Termination grace period if task killed 
in UI,
         defaults to kubernetes default
     :type termination_grace_period: int
+
+    TODO: ``is_delete_operator_pod`` default should be True

Review comment:
       this was a discussion point in the early part of the process for this 
operator.  @jedcunningham suggested taking the opportunity to fix a few things 
as part of the refactor.  along with removing some of the duplication, he 
suggested flipping delete pod to True, and adding support for usage of k8s 
hook.  if you're cool with both of these changes, i can add them both to this 
PR.  or we can defer them -- either way i can remove that TODO but let's 
resolve what to do here first.

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+        elif len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod

Review comment:
       this would  be a good case for walrus operator :) 

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,104 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
         """
-        Launches the pod synchronously and waits for completion.
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+        """
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to 
start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. 
"
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
         """
-        Monitors a pod and returns the final state, pod and xcom result
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+
+        # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ 
to loop
+        # but in a long-running process we might lose connectivity and this 
way we
+        # can resume following the logs
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like 
ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if self.container_is_running(pod, container_name=container_name):
+                self.log.info('Container %s is running', pod.metadata.name)
+                self.log.warning('Pod %s log read interrupted', 
pod.metadata.name)
+            else:
+                container_stopped = True  # fetch logs once more and exit
+
+    def await_container(self, pod: V1Pod, container_name: str) -> None:
+        while not self.container_is_running(pod=pod, 
container_name=container_name):
+            time.sleep(1)

Review comment:
       > If we want to keep it probably rename it to await_container_completion 
?
   
   sure
   
   > Do we really need a separate method here? 
   
   personally, even though this method is very  simple, i would prefer to keep 
`execute` higher level.  going through and looping is to me too low-level for 
the `execute` method, at least in the case of this operator which already has a 
lot going on.  ideally execute should be easy to understand just by reading 
method names.
   

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,104 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
         """
-        Launches the pod synchronously and waits for completion.
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+        """
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to 
start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. 
"
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
         """
-        Monitors a pod and returns the final state, pod and xcom result
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+
+        # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ 
to loop
+        # but in a long-running process we might lose connectivity and this 
way we
+        # can resume following the logs
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like 
ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if self.container_is_running(pod, container_name=container_name):
+                self.log.info('Container %s is running', pod.metadata.name)
+                self.log.warning('Pod %s log read interrupted', 
pod.metadata.name)
+            else:
+                container_stopped = True  # fetch logs once more and exit
+
+    def await_container(self, pod: V1Pod, container_name: str) -> None:
+        while not self.container_is_running(pod=pod, 
container_name=container_name):
+            time.sleep(1)

Review comment:
       > If we want to keep it probably rename it to await_container_completion 
?
   
   sure
   
   > Do we really need a separate method here? 
   
   personally, even though this method is very  simple, i would prefer to keep 
the logic out of execute.  
   
   going through and looping is to me too low-level for the `execute` method, 
at least in the case of this operator which already has a lot going on.  
   
   ideally execute should be easy to understand just by reading method names.
   

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+        elif len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                return pod
+        self.log.debug("Starting pod:\n%s", 
yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, 
config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
-
-            pod_list = self.client.list_namespaced_pod(self.namespace, 
label_selector=label_selector)
+            self.launcher.await_pod_start(pod=pod, 
startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
+            raise
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
-                )
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info(result)
+        # todo: add tests re handling of json vs non-json-serializable values

Review comment:
       this was in part a note to self to verify behavior... existing behavior 
requires that xcom result be json and this refactor keeps this behavior. will 
remove this line.

##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self, 
mock_patch_already_checked, mo
             task_id="task",
             is_delete_operator_pod=False,
         )
-        # Run it first to easily get the pod
-        pod = self.run_pod(k)
-
-        # Now try and "reattach"
-        mock_patch_already_checked.reset_mock()
-        mock_delete_pod.reset_mock()
-        self.client_mock.return_value.list_namespaced_pod.return_value.items = 
[pod]
-        self.monitor_mock.return_value = (State.FAILED, None, None)
+        remote_pod_mock = MagicMock()
+        remote_pod_mock.status.phase = 'Failed'
+        self.await_pod_mock.return_value = remote_pod_mock
 
-        context = self.create_context(k)
+        context = create_context(k)
         with pytest.raises(AirflowException):
             k.execute(context=context)
         mock_patch_already_checked.assert_called_once()
         mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+    with mock.patch('logging.Logger.error') as mock_error:
+
+        class A:
+            log = logging.getLogger()
+
+            def fail(self):
+                with _suppress_with_logging(self, ValueError):
+                    raise ValueError("failure")
+
+        a = A()
+        a.fail()
+        mock_error.assert_called_once_with("failure", exc_info=True)

Review comment:
       i have simplified this context manager to make it easier to use  and 
understand. take another look when you have a moment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to