kaxil commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r766962336



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +559,64 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
-        return pod
 
-    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, 
k8s.V1Pod, Optional[str]]:
-        """
-        Creates a new pod and monitors for duration of task
-
-        :param labels: labels used to track pod
-        :param launcher: pod launcher that will manage launching and 
monitoring pods
-        :return:
-        """
-        self.log.debug(
-            "Adding KubernetesPodOperator labels to pod before launch for task 
%s", self.task_id
-        )
+        labels = self._create_labels_for_pod(context)
+        self.log.info("creating pod with labels %s and launcher %s", labels, 
self.launcher)
 
         # Merge Pod Identifying labels with labels passed to operator
-        self.pod.metadata.labels.update(labels)
+        self.log.debug("Adding KubernetesPodOperator labels to pod before 
launch for task %s", self.task_id)
+        pod.metadata.labels.update(labels)
         # Add Airflow Version to the label
         # And a label to identify that pod is launched by KubernetesPodOperator
-        self.pod.metadata.labels.update(
+        pod.metadata.labels.update(
             {
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_pod_operator': 'True',
             }
         )
+        return pod
 
-        self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
-        final_state = None
-        try:
-            launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
-            final_state, remote_pod, result = 
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
-        except AirflowException:
-            if self.log_events_on_failure:
-                for event in launcher.read_pod_events(self.pod).items:
-                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
-            raise
-        finally:
-            if self.is_delete_operator_pod:
-                self.log.debug("Deleting pod for task %s", self.task_id)
-                launcher.delete_pod(self.pod)
-            elif final_state != State.SUCCESS:
-                self.patch_already_checked(self.pod)
-        return final_state, remote_pod, result
 
-    def patch_already_checked(self, pod: k8s.V1Pod):
-        """Add an "already tried annotation to ensure we only retry once"""
-        pod.metadata.labels["already_checked"] = "True"
-        body = PodGenerator.serialize_pod(pod)
-        self.client.patch_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace, body)
+class _suppress_with_logging(AbstractContextManager):
+    """
+    Context manager to suppress specified exceptions and log them with 
``log.error``
+    on a supplied object ``obj``.
 
-    def monitor_launched_pod(self, launcher, pod) -> Tuple[State, 
Optional[str]]:
-        """
-        Monitors a pod to completion that was created by a previous 
KubernetesPodOperator
+    The object ``obj`` must have a logger-compatible object on attribute 
``log``.
 
-        :param launcher: pod launcher that will manage launching and 
monitoring pods
-        :param pod: podspec used to find pod using k8s API
-        :return:
-        """
-        try:
-            (final_state, remote_pod, result) = launcher.monitor_pod(pod, 
get_logs=self.get_logs)
-        finally:
-            if self.is_delete_operator_pod:
-                launcher.delete_pod(pod)
-        if final_state != State.SUCCESS:
-            if self.log_events_on_failure:
-                for event in launcher.read_pod_events(pod).items:
-                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
-            if not self.is_delete_operator_pod:
-                self.patch_already_checked(pod)
-            raise AirflowException(f'Pod returned a failure: {final_state}')
-        return final_state, remote_pod, result
+    After the exception is suppressed, execution proceeds with the next
+    statement following the with statement.
 
-    def on_kill(self) -> None:
-        if self.pod:
-            pod: k8s.V1Pod = self.pod
-            namespace = pod.metadata.namespace
-            name = pod.metadata.name
-            kwargs = {}
-            if self.termination_grace_period is not None:
-                kwargs = {"grace_period_seconds": 
self.termination_grace_period}
-            self.client.delete_namespaced_pod(name=name, namespace=namespace, 
**kwargs)
+    Example usage:
+
+    .. code:: python
+
+        class A(BaseOperator):
+            def do_something(self):
+                with _suppress_with_logging(self, ValueError) as cm:
+                    raise ValueError("Some failure")
+                print("I am still reached")
+                if cm.exception:
+                    print(f"here's evidence I caught something: 
{cm.exception!r}")
+
+
+        a = A(task_id="hello")
+        a.do_something()
+    """
+
+    def __init__(self, obj, *exceptions):
+        self._exceptions = exceptions
+        if not (hasattr(obj, 'log') and hasattr(obj.log, 'error')):
+            raise ValueError("Object has no logger attribute `log`")
+        self.obj = obj
+        self.exception = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excinst, exctb):
+        caught_error = exctype is not None and issubclass(exctype, 
self._exceptions)
+        if caught_error:
+            self.exception = excinst
+            self.obj.log.error(str(excinst), exc_info=True)
+        return caught_error

Review comment:
       d




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to