jedcunningham commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r771776507
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ ).items
+
+ num_pods = len(pod_list)
+ if num_pods > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif num_pods == 1:
+ pod = pod_list[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+ if self.reattach_on_restart:
+ pod = self.find_pod(self.namespace or
pod_request_obj.metadata.namespace, context=context)
+ if pod:
+ return pod
+ self.log.debug("Starting pod:\n%s",
yaml.safe_dump(pod_request_obj.to_dict()))
+ self.launcher.create_pod(pod=pod_request_obj)
+ return pod_request_obj
+
+ def await_pod_start(self, pod):
try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(
- in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- else:
- client = kube_client.get_kube_client(
- cluster_context=self.cluster_context,
config_file=self.config_file
- )
-
- self.client = client
-
- self.pod = self.create_pod_request_obj()
- self.namespace = self.pod.metadata.namespace
-
- # Add combination of labels to uniquely identify a running pod
- labels = self.create_labels_for_pod(context)
-
- label_selector = self._get_pod_identifying_label_string(labels)
-
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
+ self.launcher.await_pod_start(pod=pod,
startup_timeout=self.startup_timeout_seconds)
+ except PodLaunchFailedException:
+ if self.log_events_on_failure:
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ raise
- if len(pod_list.items) > 1 and self.reattach_on_restart:
- raise AirflowException(
- f'More than one pod running with labels: {label_selector}'
- )
+ def extract_xcom(self, pod):
+ """Retrieves xcom value and kills xcom sidecar container"""
+ result = self.launcher.extract_xcom(pod)
+ self.log.info(result)
+ return json.loads(result)
- launcher = self.create_pod_launcher()
+ def execute(self, context):
+ remote_pod = None
+ try:
+ self.pod_request_obj = self.build_pod_request_obj(context)
+ self.pod = self.get_or_create_pod( # must set `self.pod` for
`on_kill`
+ pod_request_obj=self.pod_request_obj,
+ context=context,
+ )
+ self.await_pod_start(pod=self.pod)
- if len(pod_list.items) == 1:
- try_numbers_match = self._try_numbers_match(context,
pod_list.items[0])
- final_state, remote_pod, result = self.handle_pod_overlap(
- labels, try_numbers_match, launcher, pod_list.items[0]
+ if self.get_logs:
+ self.launcher.follow_container_logs(
+ pod=self.pod,
+ container_name=self.BASE_CONTAINER_NAME,
)
else:
- self.log.info("creating pod with labels %s and launcher %s",
labels, launcher)
- final_state, remote_pod, result =
self.create_new_pod_for_operator(labels, launcher)
- if final_state != State.SUCCESS:
- raise AirflowException(f'Pod {self.pod.metadata.name} returned
a failure: {remote_pod}')
- context['task_instance'].xcom_push(key='pod_name',
value=self.pod.metadata.name)
- context['task_instance'].xcom_push(key='pod_namespace',
value=self.namespace)
- return result
- except AirflowException as ex:
- raise AirflowException(f'Pod Launching failed: {ex}')
+ self.launcher.await_container_completion(
+ pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+ )
- def handle_pod_overlap(
- self, labels: dict, try_numbers_match: bool, launcher: Any, pod:
k8s.V1Pod
- ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
- """
+ if self.do_xcom_push:
+ result = self.extract_xcom(pod=self.pod)
+ remote_pod = self.launcher.await_pod_completion(self.pod)
+ finally:
+ self.cleanup(
+ pod=self.pod or self.pod_request_obj,
+ remote_pod=remote_pod,
+ )
+ if self.do_xcom_push:
+ ti = context['ti']
+ if remote_pod:
+ ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+ ti.xcom_push(key='pod_namespace',
value=remote_pod.metadata.namespace)
+ return result
- In cases where the Scheduler restarts while a KubernetesPodOperator
task is running,
- this function will either continue to monitor the existing pod or
launch a new pod
- based on the `reattach_on_restart` parameter.
+ def cleanup(self, pod, remote_pod):
+ with _suppress(Exception):
+ self.process_pod_deletion(pod)
- :param labels: labels used to determine if a pod is repeated
- :type labels: dict
- :param try_numbers_match: do the try numbers match? Only needed for
logging purposes
- :type try_numbers_match: bool
- :param launcher: PodLauncher
- :param pod: Pod found with matching labels
- """
- if try_numbers_match:
- log_line = f"found a running pod with labels {labels} and the same
try_number."
- else:
- log_line = f"found a running pod with labels {labels} but a
different try_number."
-
- # In case of failed pods, should reattach the first time, but only once
- # as the task will have already failed.
- if self.reattach_on_restart and not
pod.metadata.labels.get("already_checked"):
- log_line += " Will attach to this pod and monitor instead of
starting new one"
- self.log.info(log_line)
- self.pod = pod
- final_state, remote_pod, result =
self.monitor_launched_pod(launcher, pod)
+ pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status')
else None
+ if pod_phase != PodStatus.SUCCEEDED:
+ if self.log_events_on_failure:
+ with _suppress(Exception):
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ if not self.is_delete_operator_pod:
+ with _suppress(Exception):
+ self.patch_already_checked(pod)
+ raise AirflowException(f'Pod {pod and pod.metadata.name} returned
a failure: {remote_pod}')
+
+ def process_pod_deletion(self, pod):
+ if self.is_delete_operator_pod:
+ self.log.info("deleting pod")
Review comment:
```suggestion
self.log.info("deleting pod: %s", pod.metadata.name)
```
Should we also log the name of the pod?
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ ).items
+
+ num_pods = len(pod_list)
+ if num_pods > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif num_pods == 1:
+ pod = pod_list[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+ if self.reattach_on_restart:
+ pod = self.find_pod(self.namespace or
pod_request_obj.metadata.namespace, context=context)
+ if pod:
+ return pod
+ self.log.debug("Starting pod:\n%s",
yaml.safe_dump(pod_request_obj.to_dict()))
+ self.launcher.create_pod(pod=pod_request_obj)
+ return pod_request_obj
+
+ def await_pod_start(self, pod):
try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(
- in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- else:
- client = kube_client.get_kube_client(
- cluster_context=self.cluster_context,
config_file=self.config_file
- )
-
- self.client = client
-
- self.pod = self.create_pod_request_obj()
- self.namespace = self.pod.metadata.namespace
-
- # Add combination of labels to uniquely identify a running pod
- labels = self.create_labels_for_pod(context)
-
- label_selector = self._get_pod_identifying_label_string(labels)
-
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
+ self.launcher.await_pod_start(pod=pod,
startup_timeout=self.startup_timeout_seconds)
+ except PodLaunchFailedException:
+ if self.log_events_on_failure:
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ raise
- if len(pod_list.items) > 1 and self.reattach_on_restart:
- raise AirflowException(
- f'More than one pod running with labels: {label_selector}'
- )
+ def extract_xcom(self, pod):
+ """Retrieves xcom value and kills xcom sidecar container"""
+ result = self.launcher.extract_xcom(pod)
+ self.log.info(result)
Review comment:
Is this intentional? If so, probably want some context as to what it is?
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -433,7 +476,24 @@ def _set_name(self, name):
validate_key(name, max_length=220)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
- def create_pod_request_obj(self) -> k8s.V1Pod:
+ def patch_already_checked(self, pod: k8s.V1Pod):
+ """Add an "already tried annotation to ensure we only retry once"""
Review comment:
```suggestion
"""Add an "already tried" annotation to ensure we only retry once"""
```
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +561,41 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
- return pod
- def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State,
k8s.V1Pod, Optional[str]]:
- """
- Creates a new pod and monitors for duration of task
-
- :param labels: labels used to track pod
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :return:
- """
- self.log.debug(
- "Adding KubernetesPodOperator labels to pod before launch for task
%s", self.task_id
- )
+ labels = self._create_labels_for_pod(context)
+ self.log.info("creating pod with labels %s and launcher %s", labels,
self.launcher)
# Merge Pod Identifying labels with labels passed to operator
- self.pod.metadata.labels.update(labels)
+ self.log.debug("Adding KubernetesPodOperator labels to pod before
launch for task %s", self.task_id)
Review comment:
```suggestion
```
Not sure this is really that useful? We log the labels themselves as INFO
above.
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ ).items
+
+ num_pods = len(pod_list)
+ if num_pods > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif num_pods == 1:
+ pod = pod_list[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+ if self.reattach_on_restart:
+ pod = self.find_pod(self.namespace or
pod_request_obj.metadata.namespace, context=context)
+ if pod:
+ return pod
+ self.log.debug("Starting pod:\n%s",
yaml.safe_dump(pod_request_obj.to_dict()))
+ self.launcher.create_pod(pod=pod_request_obj)
+ return pod_request_obj
+
+ def await_pod_start(self, pod):
try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(
- in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- else:
- client = kube_client.get_kube_client(
- cluster_context=self.cluster_context,
config_file=self.config_file
- )
-
- self.client = client
-
- self.pod = self.create_pod_request_obj()
- self.namespace = self.pod.metadata.namespace
-
- # Add combination of labels to uniquely identify a running pod
- labels = self.create_labels_for_pod(context)
-
- label_selector = self._get_pod_identifying_label_string(labels)
-
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
+ self.launcher.await_pod_start(pod=pod,
startup_timeout=self.startup_timeout_seconds)
+ except PodLaunchFailedException:
+ if self.log_events_on_failure:
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ raise
- if len(pod_list.items) > 1 and self.reattach_on_restart:
- raise AirflowException(
- f'More than one pod running with labels: {label_selector}'
- )
+ def extract_xcom(self, pod):
+ """Retrieves xcom value and kills xcom sidecar container"""
+ result = self.launcher.extract_xcom(pod)
+ self.log.info(result)
+ return json.loads(result)
- launcher = self.create_pod_launcher()
+ def execute(self, context):
+ remote_pod = None
+ try:
+ self.pod_request_obj = self.build_pod_request_obj(context)
+ self.pod = self.get_or_create_pod( # must set `self.pod` for
`on_kill`
+ pod_request_obj=self.pod_request_obj,
+ context=context,
+ )
+ self.await_pod_start(pod=self.pod)
- if len(pod_list.items) == 1:
- try_numbers_match = self._try_numbers_match(context,
pod_list.items[0])
- final_state, remote_pod, result = self.handle_pod_overlap(
- labels, try_numbers_match, launcher, pod_list.items[0]
+ if self.get_logs:
+ self.launcher.follow_container_logs(
+ pod=self.pod,
+ container_name=self.BASE_CONTAINER_NAME,
)
else:
- self.log.info("creating pod with labels %s and launcher %s",
labels, launcher)
- final_state, remote_pod, result =
self.create_new_pod_for_operator(labels, launcher)
- if final_state != State.SUCCESS:
- raise AirflowException(f'Pod {self.pod.metadata.name} returned
a failure: {remote_pod}')
- context['task_instance'].xcom_push(key='pod_name',
value=self.pod.metadata.name)
- context['task_instance'].xcom_push(key='pod_namespace',
value=self.namespace)
- return result
- except AirflowException as ex:
- raise AirflowException(f'Pod Launching failed: {ex}')
+ self.launcher.await_container_completion(
+ pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+ )
- def handle_pod_overlap(
- self, labels: dict, try_numbers_match: bool, launcher: Any, pod:
k8s.V1Pod
- ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
- """
+ if self.do_xcom_push:
+ result = self.extract_xcom(pod=self.pod)
+ remote_pod = self.launcher.await_pod_completion(self.pod)
+ finally:
+ self.cleanup(
+ pod=self.pod or self.pod_request_obj,
+ remote_pod=remote_pod,
+ )
+ if self.do_xcom_push:
+ ti = context['ti']
+ if remote_pod:
+ ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+ ti.xcom_push(key='pod_namespace',
value=remote_pod.metadata.namespace)
+ return result
- In cases where the Scheduler restarts while a KubernetesPodOperator
task is running,
- this function will either continue to monitor the existing pod or
launch a new pod
- based on the `reattach_on_restart` parameter.
+ def cleanup(self, pod, remote_pod):
+ with _suppress(Exception):
+ self.process_pod_deletion(pod)
- :param labels: labels used to determine if a pod is repeated
- :type labels: dict
- :param try_numbers_match: do the try numbers match? Only needed for
logging purposes
- :type try_numbers_match: bool
- :param launcher: PodLauncher
- :param pod: Pod found with matching labels
- """
- if try_numbers_match:
- log_line = f"found a running pod with labels {labels} and the same
try_number."
- else:
- log_line = f"found a running pod with labels {labels} but a
different try_number."
-
- # In case of failed pods, should reattach the first time, but only once
- # as the task will have already failed.
- if self.reattach_on_restart and not
pod.metadata.labels.get("already_checked"):
- log_line += " Will attach to this pod and monitor instead of
starting new one"
- self.log.info(log_line)
- self.pod = pod
- final_state, remote_pod, result =
self.monitor_launched_pod(launcher, pod)
+ pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status')
else None
+ if pod_phase != PodStatus.SUCCEEDED:
+ if self.log_events_on_failure:
+ with _suppress(Exception):
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ if not self.is_delete_operator_pod:
+ with _suppress(Exception):
+ self.patch_already_checked(pod)
+ raise AirflowException(f'Pod {pod and pod.metadata.name} returned
a failure: {remote_pod}')
+
+ def process_pod_deletion(self, pod):
+ if self.is_delete_operator_pod:
+ self.log.info("deleting pod")
+ self.launcher.delete_pod(pod)
else:
- log_line += f"creating pod with labels {labels} and launcher
{launcher}"
- self.log.info(log_line)
- final_state, remote_pod, result =
self.create_new_pod_for_operator(labels, launcher)
- return final_state, remote_pod, result
+ self.log.info("skipping pod delete")
Review comment:
```suggestion
self.log.info("skipping deleting pod: %s", pod.metadata.name)
```
##########
File path: kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
##########
@@ -378,9 +379,11 @@ def test_fs_group(self):
assert self.expected_pod == actual_pod
def test_faulty_service_account(self):
- bad_service_account_name = "foobar"
+ """pod creation should fail when service account does not exist"""
+ service_account = "foobar"
+ namespace = 'default'
Review comment:
```suggestion
namespace = "default"
```
nit
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +561,41 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
- return pod
- def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State,
k8s.V1Pod, Optional[str]]:
- """
- Creates a new pod and monitors for duration of task
-
- :param labels: labels used to track pod
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :return:
- """
- self.log.debug(
- "Adding KubernetesPodOperator labels to pod before launch for task
%s", self.task_id
- )
+ labels = self._create_labels_for_pod(context)
+ self.log.info("creating pod with labels %s and launcher %s", labels,
self.launcher)
Review comment:
And we might want to move this down after we finish modifying the
labels, right before we return.
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -433,7 +476,24 @@ def _set_name(self, name):
validate_key(name, max_length=220)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
- def create_pod_request_obj(self) -> k8s.V1Pod:
+ def patch_already_checked(self, pod: k8s.V1Pod):
+ """Add an "already tried annotation to ensure we only retry once"""
Review comment:
```suggestion
"""Add an "already checked" annotation to ensure we don't reattach
on retries"""
```
This might be better? Basically, we're marking this pod as having been
cleaned up or handled, and subsequent retries shouldn't try to reattach to it.
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +561,41 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
- return pod
- def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State,
k8s.V1Pod, Optional[str]]:
- """
- Creates a new pod and monitors for duration of task
-
- :param labels: labels used to track pod
- :param launcher: pod launcher that will manage launching and
monitoring pods
- :return:
- """
- self.log.debug(
- "Adding KubernetesPodOperator labels to pod before launch for task
%s", self.task_id
- )
+ labels = self._create_labels_for_pod(context)
+ self.log.info("creating pod with labels %s and launcher %s", labels,
self.launcher)
Review comment:
```suggestion
self.log.info("creating pod %s with labels: %s", pod.metadata.name,
labels)
```
(untested) It'd be cool to have the pod name. Don't think logging the
launcher is helpful.
##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -320,20 +332,3 @@ def _exec_pod_command(self, resp, command: str) -> None:
self.log.info(resp.read_stderr())
Review comment:
Should we say what this is?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]