This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2045d6  Add more type hints to PodLauncher (#18928)
b2045d6 is described below

commit b2045d6d1d4d2424c02d7d9b40520440aa4e5070
Author: James Lamb <[email protected]>
AuthorDate: Wed Oct 13 12:35:41 2021 -0500

    Add more type hints to PodLauncher (#18928)
---
 .../cncf/kubernetes/utils/pod_launcher.py          | 32 ++++++++++++----------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py 
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index d3a7c44..671d260 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -19,11 +19,13 @@ import json
 import math
 import time
 from datetime import datetime as dt
-from typing import Optional, Tuple, Union
+from typing import Iterable, Optional, Tuple, Union
 
 import pendulum
 import tenacity
 from kubernetes import client, watch
+from kubernetes.client.models.v1_event import V1Event
+from kubernetes.client.models.v1_event_list import V1EventList
 from kubernetes.client.models.v1_pod import V1Pod
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
@@ -39,7 +41,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 
-def should_retry_start_pod(exception: Exception):
+def should_retry_start_pod(exception: Exception) -> bool:
     """Check if an Exception indicates a transient error and warrants 
retrying"""
     if isinstance(exception, ApiException):
         return exception.status == 409
@@ -78,7 +80,7 @@ class PodLauncher(LoggingMixin):
         self._watch = watch.Watch()
         self.extract_xcom = extract_xcom
 
-    def run_pod_async(self, pod: V1Pod, **kwargs):
+    def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod:
         """Runs POD asynchronously"""
         pod_mutation_hook(pod)
 
@@ -98,7 +100,7 @@ class PodLauncher(LoggingMixin):
             raise e
         return resp
 
-    def delete_pod(self, pod: V1Pod):
+    def delete_pod(self, pod: V1Pod) -> None:
         """Deletes POD"""
         try:
             self._client.delete_namespaced_pod(
@@ -115,7 +117,7 @@ class PodLauncher(LoggingMixin):
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120):
+    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
         Launches the pod synchronously and waits for completion.
 
@@ -210,22 +212,22 @@ class PodLauncher(LoggingMixin):
             return None, line
         return last_log_time, message
 
-    def _task_status(self, event):
+    def _task_status(self, event: V1Event) -> str:
         self.log.info('Event: %s had an event of type %s', 
event.metadata.name, event.status.phase)
         status = self.process_status(event.metadata.name, event.status.phase)
         return status
 
-    def pod_not_started(self, pod: V1Pod):
+    def pod_not_started(self, pod: V1Pod) -> bool:
         """Tests if pod has not started"""
         state = self._task_status(self.read_pod(pod))
         return state == State.QUEUED
 
-    def pod_is_running(self, pod: V1Pod):
+    def pod_is_running(self, pod: V1Pod) -> bool:
         """Tests if pod is running"""
         state = self._task_status(self.read_pod(pod))
         return state not in (State.SUCCESS, State.FAILED)
 
-    def base_container_is_running(self, pod: V1Pod):
+    def base_container_is_running(self, pod: V1Pod) -> bool:
         """Tests if base container is running"""
         event = self.read_pod(pod)
         status = next(iter(filter(lambda s: s.name == 'base', 
event.status.container_statuses)), None)
@@ -240,7 +242,7 @@ class PodLauncher(LoggingMixin):
         tail_lines: Optional[int] = None,
         timestamps: bool = False,
         since_seconds: Optional[int] = None,
-    ):
+    ) -> Iterable[str]:
         """Reads log from the POD"""
         additional_kwargs = {}
         if since_seconds:
@@ -265,7 +267,7 @@ class PodLauncher(LoggingMixin):
             raise
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod_events(self, pod):
+    def read_pod_events(self, pod: V1Pod) -> V1EventList:
         """Reads events from the POD"""
         try:
             return self._client.list_namespaced_event(
@@ -275,14 +277,14 @@ class PodLauncher(LoggingMixin):
             raise AirflowException(f'There was an error reading the kubernetes 
API: {e}')
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), 
wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod(self, pod: V1Pod):
+    def read_pod(self, pod: V1Pod) -> V1Pod:
         """Read POD information"""
         try:
             return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
         except BaseHTTPError as e:
             raise AirflowException(f'There was an error reading the kubernetes 
API: {e}')
 
-    def _extract_xcom(self, pod: V1Pod):
+    def _extract_xcom(self, pod: V1Pod) -> str:
         resp = kubernetes_stream(
             self._client.connect_get_namespaced_pod_exec,
             pod.metadata.name,
@@ -304,7 +306,7 @@ class PodLauncher(LoggingMixin):
             raise AirflowException(f'Failed to extract xcom from pod: 
{pod.metadata.name}')
         return result
 
-    def _exec_pod_command(self, resp, command):
+    def _exec_pod_command(self, resp, command: str) -> None:
         if resp.is_open():
             self.log.info('Running command... %s\n', command)
             resp.write_stdin(command + '\n')
@@ -317,7 +319,7 @@ class PodLauncher(LoggingMixin):
                     break
         return None
 
-    def process_status(self, job_id, status):
+    def process_status(self, job_id: str, status: str) -> str:
         """Process status information for the JOB"""
         status = status.lower()
         if status == PodStatus.PENDING:

Reply via email to