This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b2045d6 Add more type hints to PodLauncher (#18928)
b2045d6 is described below
commit b2045d6d1d4d2424c02d7d9b40520440aa4e5070
Author: James Lamb <[email protected]>
AuthorDate: Wed Oct 13 12:35:41 2021 -0500
Add more type hints to PodLauncher (#18928)
---
.../cncf/kubernetes/utils/pod_launcher.py | 32 ++++++++++++----------
1 file changed, 17 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index d3a7c44..671d260 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -19,11 +19,13 @@ import json
import math
import time
from datetime import datetime as dt
-from typing import Optional, Tuple, Union
+from typing import Iterable, Optional, Tuple, Union
import pendulum
import tenacity
from kubernetes import client, watch
+from kubernetes.client.models.v1_event import V1Event
+from kubernetes.client.models.v1_event_list import V1EventList
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
@@ -39,7 +41,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
-def should_retry_start_pod(exception: Exception):
+def should_retry_start_pod(exception: Exception) -> bool:
"""Check if an Exception indicates a transient error and warrants
retrying"""
if isinstance(exception, ApiException):
return exception.status == 409
@@ -78,7 +80,7 @@ class PodLauncher(LoggingMixin):
self._watch = watch.Watch()
self.extract_xcom = extract_xcom
- def run_pod_async(self, pod: V1Pod, **kwargs):
+ def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
"""Runs POD asynchronously"""
pod_mutation_hook(pod)
@@ -98,7 +100,7 @@ class PodLauncher(LoggingMixin):
raise e
return resp
- def delete_pod(self, pod: V1Pod):
+ def delete_pod(self, pod: V1Pod) -> None:
"""Deletes POD"""
try:
self._client.delete_namespaced_pod(
@@ -115,7 +117,7 @@ class PodLauncher(LoggingMixin):
reraise=True,
retry=tenacity.retry_if_exception(should_retry_start_pod),
)
- def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
+ def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
"""
Launches the pod synchronously and waits for completion.
@@ -210,22 +212,22 @@ class PodLauncher(LoggingMixin):
return None, line
return last_log_time, message
- def _task_status(self, event):
+ def _task_status(self, event: V1Event) -> str:
self.log.info('Event: %s had an event of type %s',
event.metadata.name, event.status.phase)
status = self.process_status(event.metadata.name, event.status.phase)
return status
- def pod_not_started(self, pod: V1Pod):
+ def pod_not_started(self, pod: V1Pod) -> bool:
"""Tests if pod has not started"""
state = self._task_status(self.read_pod(pod))
return state == State.QUEUED
- def pod_is_running(self, pod: V1Pod):
+ def pod_is_running(self, pod: V1Pod) -> bool:
"""Tests if pod is running"""
state = self._task_status(self.read_pod(pod))
return state not in (State.SUCCESS, State.FAILED)
- def base_container_is_running(self, pod: V1Pod):
+ def base_container_is_running(self, pod: V1Pod) -> bool:
"""Tests if base container is running"""
event = self.read_pod(pod)
status = next(iter(filter(lambda s: s.name == 'base',
event.status.container_statuses)), None)
@@ -240,7 +242,7 @@ class PodLauncher(LoggingMixin):
tail_lines: Optional[int] = None,
timestamps: bool = False,
since_seconds: Optional[int] = None,
- ):
+ ) -> Iterable[str]:
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
@@ -265,7 +267,7 @@ class PodLauncher(LoggingMixin):
raise
@tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
- def read_pod_events(self, pod):
+ def read_pod_events(self, pod: V1Pod) -> V1EventList:
"""Reads events from the POD"""
try:
return self._client.list_namespaced_event(
@@ -275,14 +277,14 @@ class PodLauncher(LoggingMixin):
raise AirflowException(f'There was an error reading the kubernetes
API: {e}')
@tenacity.retry(stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(), reraise=True)
- def read_pod(self, pod: V1Pod):
+ def read_pod(self, pod: V1Pod) -> V1Pod:
"""Read POD information"""
try:
return self._client.read_namespaced_pod(pod.metadata.name,
pod.metadata.namespace)
except BaseHTTPError as e:
raise AirflowException(f'There was an error reading the kubernetes
API: {e}')
- def _extract_xcom(self, pod: V1Pod):
+ def _extract_xcom(self, pod: V1Pod) -> str:
resp = kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
@@ -304,7 +306,7 @@ class PodLauncher(LoggingMixin):
raise AirflowException(f'Failed to extract xcom from pod:
{pod.metadata.name}')
return result
- def _exec_pod_command(self, resp, command):
+ def _exec_pod_command(self, resp, command: str) -> None:
if resp.is_open():
self.log.info('Running command... %s\n', command)
resp.write_stdin(command + '\n')
@@ -317,7 +319,7 @@ class PodLauncher(LoggingMixin):
break
return None
- def process_status(self, job_id, status):
+ def process_status(self, job_id: str, status: str) -> str:
"""Process status information for the JOB"""
status = status.lower()
if status == PodStatus.PENDING: