This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5569b86  Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)
5569b86 is described below

commit 5569b868a990c97dfc63a0e014a814ec1cc0f953
Author: Dmytro Kazanzhy <[email protected]>
AuthorDate: Sun Jan 9 23:28:13 2022 +0200

    Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)
---
 .../providers/apache/hive/transfers/s3_to_hive.py  |  4 +--
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 29 ++++++++++------------
 .../providers/cncf/kubernetes/utils/pod_manager.py | 18 +++++++-------
 .../tableau/operators/tableau_refresh_workbook.py  |  2 +-
 4 files changed, 25 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/apache/hive/transfers/s3_to_hive.py 
b/airflow/providers/apache/hive/transfers/s3_to_hive.py
index 8362954..fe28184 100644
--- a/airflow/providers/apache/hive/transfers/s3_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/s3_to_hive.py
@@ -23,7 +23,7 @@ import gzip
 import os
 import tempfile
 from tempfile import NamedTemporaryFile, TemporaryDirectory
-from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -178,7 +178,7 @@ class S3ToHiveOperator(BaseOperator):
                 if self.delimiter:
                     option['FieldDelimiter'] = self.delimiter
 
-                input_serialization = {'CSV': option}
+                input_serialization: Dict[str, Any] = {'CSV': option}
                 if self.input_compressed:
                     input_serialization['CompressionType'] = 'GZIP'
 
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 2a17d88..1ae72fb 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -18,25 +18,13 @@
 import json
 import logging
 import re
+import sys
 import warnings
 from contextlib import AbstractContextManager
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
 
 from kubernetes.client import CoreV1Api, models as k8s
 
-from airflow.providers.cncf.kubernetes.utils.pod_manager import 
PodLaunchFailedException, PodManager, PodPhase
-from airflow.settings import pod_mutation_hook
-
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
-
-try:
-    from functools import cached_property
-except ImportError:
-    from cached_property import cached_property
-
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import kube_client, pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
@@ -56,9 +44,17 @@ from 
airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
 )
 from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import 
PodRuntimeInfoEnv
 from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
+from airflow.providers.cncf.kubernetes.utils.pod_manager import 
PodLaunchFailedException, PodManager, PodPhase
+from airflow.settings import pod_mutation_hook
+from airflow.utils import yaml
 from airflow.utils.helpers import validate_key
 from airflow.version import version as airflow_version
 
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
 if TYPE_CHECKING:
     import jinja2
 
@@ -238,7 +234,7 @@ class KubernetesPodOperator(BaseOperator):
         do_xcom_push: bool = False,
         pod_template_file: Optional[str] = None,
         priority_class_name: Optional[str] = None,
-        pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None,
+        pod_runtime_info_envs: Optional[List[PodRuntimeInfoEnv]] = None,
         termination_grace_period: Optional[int] = None,
         configmaps: Optional[List[str]] = None,
         **kwargs,
@@ -307,7 +303,7 @@ class KubernetesPodOperator(BaseOperator):
     def _render_nested_template_fields(
         self,
         content: Any,
-        context: Dict,
+        context: 'Context',
         jinja_env: "jinja2.Environment",
         seen_oids: set,
     ) -> None:
@@ -369,6 +365,7 @@ class KubernetesPodOperator(BaseOperator):
             label_selector=label_selector,
         ).items
 
+        pod = None
         num_pods = len(pod_list)
         if num_pods > 1:
             raise AirflowException(f'More than one pod running with labels 
{label_selector}')
@@ -377,7 +374,7 @@ class KubernetesPodOperator(BaseOperator):
             self.log.info("Found matching pod %s with labels %s", 
pod.metadata.name, pod.metadata.labels)
             self.log.info("`try_number` of task_instance: %s", 
context['ti'].try_number)
             self.log.info("`try_number` of pod: %s", 
pod.metadata.labels['try_number'])
-            return pod
+        return pod
 
     def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
         if self.reattach_on_restart:
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3640eb2..96fd03a 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -20,7 +20,7 @@ import math
 import time
 from contextlib import closing
 from datetime import datetime
-from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Tuple, cast
 
 import pendulum
 import tenacity
@@ -28,9 +28,9 @@ from kubernetes import client, watch
 from kubernetes.client.models.v1_pod import V1Pod
 from kubernetes.client.rest import ApiException
 from kubernetes.stream import stream as kubernetes_stream
-from pendulum import Date, DateTime, Duration, Time
+from pendulum import DateTime
 from pendulum.parsing.exceptions import ParserError
-from requests.exceptions import BaseHTTPError
+from urllib3.exceptions import HTTPError as BaseHTTPError
 
 from airflow.exceptions import AirflowException
 from airflow.kubernetes.kube_client import get_kube_client
@@ -49,7 +49,7 @@ class PodLaunchFailedException(AirflowException):
     """When pod launching fails in KubernetesPodOperator."""
 
 
-def should_retry_start_pod(exception: Exception) -> bool:
+def should_retry_start_pod(exception: BaseException) -> bool:
     """Check if an Exception indicates a transient error and warrants 
retrying"""
     if isinstance(exception, ApiException):
         return exception.status == 409
@@ -180,7 +180,7 @@ class PodManager(LoggingMixin):
             So the looping logic is there to let us resume following the logs.
         """
 
-        def follow_logs(since_time: Optional[datetime] = None) -> 
Optional[datetime]:
+        def follow_logs(since_time: Optional[DateTime] = None) -> 
Optional[DateTime]:
             """
             Tries to follow container logs until container completes.
             For a long-running container, sometimes the log read may be 
interrupted
@@ -198,7 +198,7 @@ class PodManager(LoggingMixin):
                         math.ceil((pendulum.now() - 
since_time).total_seconds()) if since_time else None
                     ),
                 )
-                for line in logs:  # type: bytes
+                for line in logs:
                     timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
                     self.log.info(message)
             except BaseHTTPError:  # Catches errors like 
ProtocolError(TimeoutError).
@@ -241,7 +241,7 @@ class PodManager(LoggingMixin):
             time.sleep(2)
         return remote_pod
 
-    def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, 
DateTime, Duration]], str]:
+    def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]:
         """
         Parse K8s log line and returns the final state
 
@@ -256,7 +256,7 @@ class PodManager(LoggingMixin):
         timestamp = line[:split_at]
         message = line[split_at + 1 :].rstrip()
         try:
-            last_log_time = pendulum.parse(timestamp)
+            last_log_time = cast(DateTime, pendulum.parse(timestamp))
         except ParserError:
             self.log.error("Error parsing timestamp. Will continue execution 
but won't update timestamp")
             return None, line
@@ -275,7 +275,7 @@ class PodManager(LoggingMixin):
         tail_lines: Optional[int] = None,
         timestamps: bool = False,
         since_seconds: Optional[int] = None,
-    ) -> Iterable[str]:
+    ) -> Iterable[bytes]:
         """Reads log from the POD"""
         additional_kwargs = {}
         if since_seconds:
diff --git a/airflow/providers/tableau/operators/tableau_refresh_workbook.py 
b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
index be9a548..7a1e29f 100644
--- a/airflow/providers/tableau/operators/tableau_refresh_workbook.py
+++ b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
@@ -92,6 +92,6 @@ class TableauRefreshWorkbookOperator(BaseOperator):
             check_interval=self.check_interval,
             task_id='refresh_workbook',
             dag=None,
-        ).execute(context={})
+        ).execute(context=context)
 
         return job_id

Reply via email to