dstandish commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r766915248
##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self,
mock_patch_already_checked, mo
task_id="task",
is_delete_operator_pod=False,
)
- # Run it first to easily get the pod
- pod = self.run_pod(k)
-
- # Now try and "reattach"
- mock_patch_already_checked.reset_mock()
- mock_delete_pod.reset_mock()
- self.client_mock.return_value.list_namespaced_pod.return_value.items =
[pod]
- self.monitor_mock.return_value = (State.FAILED, None, None)
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = 'Failed'
+ self.await_pod_mock.return_value = remote_pod_mock
- context = self.create_context(k)
+ context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+ with mock.patch('logging.Logger.error') as mock_error:
+
+ class A:
+ log = logging.getLogger()
+
+ def fail(self):
+ with _suppress_with_logging(self, ValueError):
+ raise ValueError("failure")
+
+ a = A()
+ a.fail()
+ mock_error.assert_called_once_with("failure", exc_info=True)
Review comment:
OK so yeah I knew this would raise an eyebrow
There are a variety of "cleanup" tasks that we want to do at execution end,
whether the run is successful or not:
* delete operator
* harvest logs on failure
* patch already checked
Add to that push xcom, which as you know there's a request to move that in
the finally block.
Any one of these distinct cleanup tasks could fail for a variety of reasons.
But I want to attempt each of them, even if th others fail.
So I thought it best to suppress the exceptions when making each call.
But with the built-in `suppress` context mgr, if it _does_ fail, the
exceptioned will be swallowed with no logging. So what this one does is
suppress, but also log that there was a failure.
We could achieve the same with multiple try/excepts in the finally, but this
is cleaner.
Admittedly it's a bit odd...
WDYT?
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -163,8 +175,14 @@ class KubernetesPodOperator(BaseOperator):
:param termination_grace_period: Termination grace period if task killed
in UI,
defaults to kubernetes default
:type termination_grace_period: int
+
+ TODO: ``is_delete_operator_pod`` default should be True
Review comment:
this was a discussion point in the early part of the process for this
operator. @jedcunningham suggested taking the opportunity to fix a few things
as part of the refactor. along with removing some of the duplication, he
suggested flipping delete pod to True, and adding support for usage of k8s
hook. if you're cool with both of these changes, i can add them both to this
PR. or we can defer them -- either way i can remove that TODO but let's
resolve what to do here first.
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ )
+ if len(pod_list.items) > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif len(pod_list.items) == 1:
+ pod = pod_list.items[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
Review comment:
this would be a good case for walrus operator :)
##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,104 @@ def delete_pod(self, pod: V1Pod) -> None:
reraise=True,
retry=tenacity.retry_if_exception(should_retry_start_pod),
)
- def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+ def create_pod(self, pod: V1Pod) -> V1Pod:
"""
- Launches the pod synchronously and waits for completion.
+ Launches the pod asynchronously.
+
+ :param pod:
+ :return:
+ """
+ return self.run_pod_async(pod)
+
+ def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+ """
+ Waits for the pod to reach phase other than ``Pending``
:param pod:
:param startup_timeout: Timeout (in seconds) for startup of the pod
(if pod is pending for too long, fails task)
:return:
"""
- resp = self.run_pod_async(pod)
curr_time = dt.now()
- if resp.status.start_time is None:
- while self.pod_not_started(pod):
- self.log.warning("Pod not yet started: %s", pod.metadata.name)
- delta = dt.now() - curr_time
- if delta.total_seconds() >= startup_timeout:
- msg = (
- f"Pod took longer than {startup_timeout} seconds to
start. "
- "Check the pod events in kubernetes to determine why."
- )
- raise AirflowException(msg)
- time.sleep(1)
-
- def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod,
Optional[str]]:
+ while True:
+ remote_pod = self.read_pod(pod)
+ if remote_pod.status.phase != PodStatus.PENDING:
+ break
+ self.log.warning("Pod not yet started: %s", pod.metadata.name)
+ delta = dt.now() - curr_time
+ if delta.total_seconds() >= startup_timeout:
+ msg = (
+ f"Pod took longer than {startup_timeout} seconds to start.
"
+ "Check the pod events in kubernetes to determine why."
+ )
+ raise PodLaunchFailedException(msg)
+ time.sleep(1)
+
+ def follow_container_logs(self, pod: V1Pod, container_name: str):
+ """
+ Follows the logs of container and streams to airflow logging.
+ Returns when container exits.
"""
- Monitors a pod and returns the final state, pod and xcom result
+ container_stopped = False
+ read_logs_since_sec = None
+ last_log_time = None
+
+ # `read_pod_logs` follows the logs so we shouldn't necessarily _need_
to loop
+ # but in a long-running process we might lose connectivity and this
way we
+ # can resume following the logs
+ while True:
+ try:
+ logs = self.read_pod_logs(
+ pod=pod,
+ container_name=container_name,
+ timestamps=True,
+ since_seconds=read_logs_since_sec,
+ )
+ for line in logs: # type: bytes
+ timestamp, message =
self.parse_log_line(line.decode('utf-8'))
+ self.log.info(message)
+ if timestamp:
+ last_log_time = timestamp
+ except BaseHTTPError: # Catches errors like
ProtocolError(TimeoutError).
+ self.log.warning(
+ 'Failed to read logs for pod %s',
+ pod.metadata.name,
+ exc_info=True,
+ )
+
+ if container_stopped is True:
+ break
+
+ if last_log_time:
+ delta = pendulum.now() - last_log_time
+ read_logs_since_sec = math.ceil(delta.total_seconds())
+
+ time.sleep(1)
+
+ if self.container_is_running(pod, container_name=container_name):
+ self.log.info('Container %s is running', pod.metadata.name)
+ self.log.warning('Pod %s log read interrupted',
pod.metadata.name)
+ else:
+ container_stopped = True # fetch logs once more and exit
+
+ def await_container(self, pod: V1Pod, container_name: str) -> None:
+ while not self.container_is_running(pod=pod,
container_name=container_name):
+ time.sleep(1)
Review comment:
> If we want to keep it probably rename it to await_container_completion
?
sure
> Do we really need a separate method here?
personally, even though this method is very simple, i would prefer to keep
`execute` higher level. going through and looping is to me too low-level for
the `execute` method, at least in the case of this operator which already has a
lot going on. ideally execute should be easy to understand just by reading
method names.
##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,104 @@ def delete_pod(self, pod: V1Pod) -> None:
reraise=True,
retry=tenacity.retry_if_exception(should_retry_start_pod),
)
- def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+ def create_pod(self, pod: V1Pod) -> V1Pod:
"""
- Launches the pod synchronously and waits for completion.
+ Launches the pod asynchronously.
+
+ :param pod:
+ :return:
+ """
+ return self.run_pod_async(pod)
+
+ def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+ """
+ Waits for the pod to reach phase other than ``Pending``
:param pod:
:param startup_timeout: Timeout (in seconds) for startup of the pod
(if pod is pending for too long, fails task)
:return:
"""
- resp = self.run_pod_async(pod)
curr_time = dt.now()
- if resp.status.start_time is None:
- while self.pod_not_started(pod):
- self.log.warning("Pod not yet started: %s", pod.metadata.name)
- delta = dt.now() - curr_time
- if delta.total_seconds() >= startup_timeout:
- msg = (
- f"Pod took longer than {startup_timeout} seconds to
start. "
- "Check the pod events in kubernetes to determine why."
- )
- raise AirflowException(msg)
- time.sleep(1)
-
- def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod,
Optional[str]]:
+ while True:
+ remote_pod = self.read_pod(pod)
+ if remote_pod.status.phase != PodStatus.PENDING:
+ break
+ self.log.warning("Pod not yet started: %s", pod.metadata.name)
+ delta = dt.now() - curr_time
+ if delta.total_seconds() >= startup_timeout:
+ msg = (
+ f"Pod took longer than {startup_timeout} seconds to start.
"
+ "Check the pod events in kubernetes to determine why."
+ )
+ raise PodLaunchFailedException(msg)
+ time.sleep(1)
+
+ def follow_container_logs(self, pod: V1Pod, container_name: str):
+ """
+ Follows the logs of container and streams to airflow logging.
+ Returns when container exits.
"""
- Monitors a pod and returns the final state, pod and xcom result
+ container_stopped = False
+ read_logs_since_sec = None
+ last_log_time = None
+
+ # `read_pod_logs` follows the logs so we shouldn't necessarily _need_
to loop
+ # but in a long-running process we might lose connectivity and this
way we
+ # can resume following the logs
+ while True:
+ try:
+ logs = self.read_pod_logs(
+ pod=pod,
+ container_name=container_name,
+ timestamps=True,
+ since_seconds=read_logs_since_sec,
+ )
+ for line in logs: # type: bytes
+ timestamp, message =
self.parse_log_line(line.decode('utf-8'))
+ self.log.info(message)
+ if timestamp:
+ last_log_time = timestamp
+ except BaseHTTPError: # Catches errors like
ProtocolError(TimeoutError).
+ self.log.warning(
+ 'Failed to read logs for pod %s',
+ pod.metadata.name,
+ exc_info=True,
+ )
+
+ if container_stopped is True:
+ break
+
+ if last_log_time:
+ delta = pendulum.now() - last_log_time
+ read_logs_since_sec = math.ceil(delta.total_seconds())
+
+ time.sleep(1)
+
+ if self.container_is_running(pod, container_name=container_name):
+ self.log.info('Container %s is running', pod.metadata.name)
+ self.log.warning('Pod %s log read interrupted',
pod.metadata.name)
+ else:
+ container_stopped = True # fetch logs once more and exit
+
+ def await_container(self, pod: V1Pod, container_name: str) -> None:
+ while not self.container_is_running(pod=pod,
container_name=container_name):
+ time.sleep(1)
Review comment:
> If we want to keep it probably rename it to await_container_completion
?
sure
> Do we really need a separate method here?
personally, even though this method is very simple, i would prefer to keep
the logic out of execute.
going through and looping is to me too low-level for the `execute` method,
at least in the case of this operator which already has a lot going on.
ideally execute should be easy to understand just by reading method names.
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ )
+ if len(pod_list.items) > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif len(pod_list.items) == 1:
+ pod = pod_list.items[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+ if self.reattach_on_restart:
+ pod = self.find_pod(self.namespace or
pod_request_obj.metadata.namespace, context=context)
+ if pod:
+ return pod
+ self.log.debug("Starting pod:\n%s",
yaml.safe_dump(pod_request_obj.to_dict()))
+ self.launcher.create_pod(pod=pod_request_obj)
+ return pod_request_obj
+
+ def await_pod_start(self, pod):
try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(
- in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- else:
- client = kube_client.get_kube_client(
- cluster_context=self.cluster_context,
config_file=self.config_file
- )
-
- self.client = client
-
- self.pod = self.create_pod_request_obj()
- self.namespace = self.pod.metadata.namespace
-
- # Add combination of labels to uniquely identify a running pod
- labels = self.create_labels_for_pod(context)
-
- label_selector = self._get_pod_identifying_label_string(labels)
-
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
+ self.launcher.await_pod_start(pod=pod,
startup_timeout=self.startup_timeout_seconds)
+ except PodLaunchFailedException:
+ if self.log_events_on_failure:
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ raise
- if len(pod_list.items) > 1 and self.reattach_on_restart:
- raise AirflowException(
- f'More than one pod running with labels: {label_selector}'
- )
+ def extract_xcom(self, pod):
+ """Retrieves xcom value and kills xcom sidecar container"""
+ result = self.launcher.extract_xcom(pod)
+ self.log.info(result)
+ # todo: add tests re handling of json vs non-json-serializable values
Review comment:
this was in part a note to self to verify behavior... existing behavior
requires that xcom result be json and this refactor keeps this behavior. will
remove this line.
##########
File path: tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
##########
@@ -762,17 +789,27 @@ def test_mark_reattached_pod_if_not_deleted(self,
mock_patch_already_checked, mo
task_id="task",
is_delete_operator_pod=False,
)
- # Run it first to easily get the pod
- pod = self.run_pod(k)
-
- # Now try and "reattach"
- mock_patch_already_checked.reset_mock()
- mock_delete_pod.reset_mock()
- self.client_mock.return_value.list_namespaced_pod.return_value.items =
[pod]
- self.monitor_mock.return_value = (State.FAILED, None, None)
+ remote_pod_mock = MagicMock()
+ remote_pod_mock.status.phase = 'Failed'
+ self.await_pod_mock.return_value = remote_pod_mock
- context = self.create_context(k)
+ context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()
+
+
+def test_suppress_with_logging():
+ with mock.patch('logging.Logger.error') as mock_error:
+
+ class A:
+ log = logging.getLogger()
+
+ def fail(self):
+ with _suppress_with_logging(self, ValueError):
+ raise ValueError("failure")
+
+ a = A()
+ a.fail()
+ mock_error.assert_called_once_with("failure", exc_info=True)
Review comment:
i have simplified this context manager to make it easier to use and
understand. take another look when you have a moment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]