kaxil commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r766956061
##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self,
mock_patch_already_checked, mo
task_id="task",
is_delete_operator_pod=False,
)
- # Run it first to easily get the pod
- pod = self.run_pod(k)
-
- # Now try and "reattach"
- mock_patch_already_checked.reset_mock()
- mock_delete_pod.reset_mock()
- self.client_mock.return_value.list_namespaced_pod.return_value.items =
[pod]
- self.monitor_mock.return_value = (State.FAILED, None, None)
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = 'Failed'
+ self.await_pod_mock.return_value = remote_pod_mock
- context = self.create_context(k)
+ context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+ with mock.patch('logging.Logger.error') as mock_error:
+
+ class A:
+ log = logging.getLogger()
+
+ def fail(self):
+ with _suppress_with_logging(self, ValueError):
+ raise ValueError("failure")
+
+ a = A()
+ a.fail()
+ mock_error.assert_called_once_with("failure", exc_info=True)
Review comment:
`test_suppress_with_logging` is not called anywhere so how does it work?
Feels really odd and if we want to use it needs lot of comments in the code
about the definition explaining for future devs on what it does and why
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +559,64 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
- return pod
- def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State,
k8s.V1Pod, Optional[str]]:
- """
- Creates a new pod and monitors for duration of task
-
- :param labels: labels used to track pod
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :return:
- """
- self.log.debug(
- "Adding KubernetesPodOperator labels to pod before launch for task
%s", self.task_id
- )
+ labels = self._create_labels_for_pod(context)
+ self.log.info("creating pod with labels %s and launcher %s", labels,
self.launcher)
# Merge Pod Identifying labels with labels passed to operator
- self.pod.metadata.labels.update(labels)
+ self.log.debug("Adding KubernetesPodOperator labels to pod before
launch for task %s", self.task_id)
+ pod.metadata.labels.update(labels)
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
- self.pod.metadata.labels.update(
+ pod.metadata.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_pod_operator': 'True',
}
)
+ return pod
- self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
- final_state = None
- try:
- launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
- final_state, remote_pod, result =
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
- except AirflowException:
- if self.log_events_on_failure:
- for event in launcher.read_pod_events(self.pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
- raise
- finally:
- if self.is_delete_operator_pod:
- self.log.debug("Deleting pod for task %s", self.task_id)
- launcher.delete_pod(self.pod)
- elif final_state != State.SUCCESS:
- self.patch_already_checked(self.pod)
- return final_state, remote_pod, result
- def patch_already_checked(self, pod: k8s.V1Pod):
- """Add an "already tried annotation to ensure we only retry once"""
- pod.metadata.labels["already_checked"] = "True"
- body = PodGenerator.serialize_pod(pod)
- self.client.patch_namespaced_pod(pod.metadata.name,
pod.metadata.namespace, body)
+class _suppress_with_logging(AbstractContextManager):
+ """
+ Context manager to suppress specified exceptions and log them with
``log.error``
+ on a supplied object ``obj``.
- def monitor_launched_pod(self, launcher, pod) -> Tuple[State,
Optional[str]]:
- """
- Monitors a pod to completion that was created by a previous
KubernetesPodOperator
+ The object ``obj`` must have a logger-compatible object on attribute
``log``.
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :param pod: podspec used to find pod using k8s API
- :return:
- """
- try:
- (final_state, remote_pod, result) = launcher.monitor_pod(pod,
get_logs=self.get_logs)
- finally:
- if self.is_delete_operator_pod:
- launcher.delete_pod(pod)
- if final_state != State.SUCCESS:
- if self.log_events_on_failure:
- for event in launcher.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
- if not self.is_delete_operator_pod:
- self.patch_already_checked(pod)
- raise AirflowException(f'Pod returned a failure: {final_state}')
- return final_state, remote_pod, result
+ After the exception is suppressed, execution proceeds with the next
+ statement following the with statement.
- def on_kill(self) -> None:
- if self.pod:
- pod: k8s.V1Pod = self.pod
- namespace = pod.metadata.namespace
- name = pod.metadata.name
- kwargs = {}
- if self.termination_grace_period is not None:
- kwargs = {"grace_period_seconds":
self.termination_grace_period}
- self.client.delete_namespaced_pod(name=name, namespace=namespace,
**kwargs)
+ Example usage:
+
+ .. code:: python
+
+ class A(BaseOperator):
+ def do_something(self):
+ with _suppress_with_logging(self, ValueError) as cm:
+ raise ValueError("Some failure")
+ print("I am still reached")
+ if cm.exception:
+ print(f"here's evidence I caught something:
{cm.exception!r}")
+
+
+ a = A(task_id="hello")
+ a.do_something()
+ """
+
+ def __init__(self, obj, *exceptions):
+ self._exceptions = exceptions
+ if not (hasattr(obj, 'log') and hasattr(obj.log, 'error')):
+ raise ValueError("Object has no logger attribute `log`")
+ self.obj = obj
+ self.exception = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exctype, excinst, exctb):
+ caught_error = exctype is not None and issubclass(exctype,
self._exceptions)
+ if caught_error:
+ self.exception = excinst
+ self.obj.log.error(str(excinst), exc_info=True)
+ return caught_error
Review comment:
is this just written for tests?
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +559,64 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
- return pod
- def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State,
k8s.V1Pod, Optional[str]]:
- """
- Creates a new pod and monitors for duration of task
-
- :param labels: labels used to track pod
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :return:
- """
- self.log.debug(
- "Adding KubernetesPodOperator labels to pod before launch for task
%s", self.task_id
- )
+ labels = self._create_labels_for_pod(context)
+ self.log.info("creating pod with labels %s and launcher %s", labels,
self.launcher)
# Merge Pod Identifying labels with labels passed to operator
- self.pod.metadata.labels.update(labels)
+ self.log.debug("Adding KubernetesPodOperator labels to pod before
launch for task %s", self.task_id)
+ pod.metadata.labels.update(labels)
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
- self.pod.metadata.labels.update(
+ pod.metadata.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_pod_operator': 'True',
}
)
+ return pod
- self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
- final_state = None
- try:
- launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
- final_state, remote_pod, result =
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
- except AirflowException:
- if self.log_events_on_failure:
- for event in launcher.read_pod_events(self.pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
- raise
- finally:
- if self.is_delete_operator_pod:
- self.log.debug("Deleting pod for task %s", self.task_id)
- launcher.delete_pod(self.pod)
- elif final_state != State.SUCCESS:
- self.patch_already_checked(self.pod)
- return final_state, remote_pod, result
- def patch_already_checked(self, pod: k8s.V1Pod):
- """Add an "already tried annotation to ensure we only retry once"""
- pod.metadata.labels["already_checked"] = "True"
- body = PodGenerator.serialize_pod(pod)
- self.client.patch_namespaced_pod(pod.metadata.name,
pod.metadata.namespace, body)
+class _suppress_with_logging(AbstractContextManager):
+ """
+ Context manager to suppress specified exceptions and log them with
``log.error``
+ on a supplied object ``obj``.
- def monitor_launched_pod(self, launcher, pod) -> Tuple[State,
Optional[str]]:
- """
- Monitors a pod to completion that was created by a previous
KubernetesPodOperator
+ The object ``obj`` must have a logger-compatible object on attribute
``log``.
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :param pod: podspec used to find pod using k8s API
- :return:
- """
- try:
- (final_state, remote_pod, result) = launcher.monitor_pod(pod,
get_logs=self.get_logs)
- finally:
- if self.is_delete_operator_pod:
- launcher.delete_pod(pod)
- if final_state != State.SUCCESS:
- if self.log_events_on_failure:
- for event in launcher.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason,
event.message)
- if not self.is_delete_operator_pod:
- self.patch_already_checked(pod)
- raise AirflowException(f'Pod returned a failure: {final_state}')
- return final_state, remote_pod, result
+ After the exception is suppressed, execution proceeds with the next
+ statement following the with statement.
- def on_kill(self) -> None:
- if self.pod:
- pod: k8s.V1Pod = self.pod
- namespace = pod.metadata.namespace
- name = pod.metadata.name
- kwargs = {}
- if self.termination_grace_period is not None:
- kwargs = {"grace_period_seconds":
self.termination_grace_period}
- self.client.delete_namespaced_pod(name=name, namespace=namespace,
**kwargs)
+ Example usage:
+
+ .. code:: python
+
+ class A(BaseOperator):
+ def do_something(self):
+ with _suppress_with_logging(self, ValueError) as cm:
+ raise ValueError("Some failure")
+ print("I am still reached")
+ if cm.exception:
+ print(f"here's evidence I caught something:
{cm.exception!r}")
+
+
+ a = A(task_id="hello")
+ a.do_something()
+ """
+
+ def __init__(self, obj, *exceptions):
+ self._exceptions = exceptions
+ if not (hasattr(obj, 'log') and hasattr(obj.log, 'error')):
+ raise ValueError("Object has no logger attribute `log`")
+ self.obj = obj
+ self.exception = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exctype, excinst, exctb):
+ caught_error = exctype is not None and issubclass(exctype,
self._exceptions)
+ if caught_error:
+ self.exception = excinst
+ self.obj.log.error(str(excinst), exc_info=True)
+ return caught_error
Review comment:
d
##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self,
mock_patch_already_checked, mo
task_id="task",
is_delete_operator_pod=False,
)
- # Run it first to easily get the pod
- pod = self.run_pod(k)
-
- # Now try and "reattach"
- mock_patch_already_checked.reset_mock()
- mock_delete_pod.reset_mock()
- self.client_mock.return_value.list_namespaced_pod.return_value.items =
[pod]
- self.monitor_mock.return_value = (State.FAILED, None, None)
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = 'Failed'
+ self.await_pod_mock.return_value = remote_pod_mock
- context = self.create_context(k)
+ context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+ with mock.patch('logging.Logger.error') as mock_error:
+
+ class A:
+ log = logging.getLogger()
+
+ def fail(self):
+ with _suppress_with_logging(self, ValueError):
+ raise ValueError("failure")
+
+ a = A()
+ a.fail()
+ mock_error.assert_called_once_with("failure", exc_info=True)
Review comment:
I see.. this is actually for
https://github.com/apache/airflow/blob/682c475d0b804f505f123e2da0c7d64e0b93f9b4/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L580-L605
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -163,8 +175,14 @@ class KubernetesPodOperator(BaseOperator):
:param termination_grace_period: Termination grace period if task killed
in UI,
defaults to kubernetes default
:type termination_grace_period: int
+
+ TODO: ``is_delete_operator_pod`` default should be True
Review comment:
Let's change that in a separate PR -- so that this PR just
simplifies/refactors things but keeps the same behavior
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]