This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5569b86 Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)
5569b86 is described below
commit 5569b868a990c97dfc63a0e014a814ec1cc0f953
Author: Dmytro Kazanzhy <[email protected]>
AuthorDate: Sun Jan 9 23:28:13 2022 +0200
Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)
---
.../providers/apache/hive/transfers/s3_to_hive.py | 4 +--
.../cncf/kubernetes/operators/kubernetes_pod.py | 29 ++++++++++------------
.../providers/cncf/kubernetes/utils/pod_manager.py | 18 +++++++-------
.../tableau/operators/tableau_refresh_workbook.py | 2 +-
4 files changed, 25 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/apache/hive/transfers/s3_to_hive.py
b/airflow/providers/apache/hive/transfers/s3_to_hive.py
index 8362954..fe28184 100644
--- a/airflow/providers/apache/hive/transfers/s3_to_hive.py
+++ b/airflow/providers/apache/hive/transfers/s3_to_hive.py
@@ -23,7 +23,7 @@ import gzip
import os
import tempfile
from tempfile import NamedTemporaryFile, TemporaryDirectory
-from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union
+from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@@ -178,7 +178,7 @@ class S3ToHiveOperator(BaseOperator):
if self.delimiter:
option['FieldDelimiter'] = self.delimiter
- input_serialization = {'CSV': option}
+ input_serialization: Dict[str, Any] = {'CSV': option}
if self.input_compressed:
input_serialization['CompressionType'] = 'GZIP'
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 2a17d88..1ae72fb 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -18,25 +18,13 @@
import json
import logging
import re
+import sys
import warnings
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from kubernetes.client import CoreV1Api, models as k8s
-from airflow.providers.cncf.kubernetes.utils.pod_manager import
PodLaunchFailedException, PodManager, PodPhase
-from airflow.settings import pod_mutation_hook
-
-try:
- import airflow.utils.yaml as yaml
-except ImportError:
- import yaml
-
-try:
- from functools import cached_property
-except ImportError:
- from cached_property import cached_property
-
from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
@@ -56,9 +44,17 @@ from
airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
)
from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import
PodRuntimeInfoEnv
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
+from airflow.providers.cncf.kubernetes.utils.pod_manager import
PodLaunchFailedException, PodManager, PodPhase
+from airflow.settings import pod_mutation_hook
+from airflow.utils import yaml
from airflow.utils.helpers import validate_key
from airflow.version import version as airflow_version
+if sys.version_info >= (3, 8):
+ from functools import cached_property
+else:
+ from cached_property import cached_property
+
if TYPE_CHECKING:
import jinja2
@@ -238,7 +234,7 @@ class KubernetesPodOperator(BaseOperator):
do_xcom_push: bool = False,
pod_template_file: Optional[str] = None,
priority_class_name: Optional[str] = None,
- pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None,
+ pod_runtime_info_envs: Optional[List[PodRuntimeInfoEnv]] = None,
termination_grace_period: Optional[int] = None,
configmaps: Optional[List[str]] = None,
**kwargs,
@@ -307,7 +303,7 @@ class KubernetesPodOperator(BaseOperator):
def _render_nested_template_fields(
self,
content: Any,
- context: Dict,
+ context: 'Context',
jinja_env: "jinja2.Environment",
seen_oids: set,
) -> None:
@@ -369,6 +365,7 @@ class KubernetesPodOperator(BaseOperator):
label_selector=label_selector,
).items
+ pod = None
num_pods = len(pod_list)
if num_pods > 1:
raise AirflowException(f'More than one pod running with labels
{label_selector}')
@@ -377,7 +374,7 @@ class KubernetesPodOperator(BaseOperator):
self.log.info("Found matching pod %s with labels %s",
pod.metadata.name, pod.metadata.labels)
self.log.info("`try_number` of task_instance: %s",
context['ti'].try_number)
self.log.info("`try_number` of pod: %s",
pod.metadata.labels['try_number'])
- return pod
+ return pod
def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
if self.reattach_on_restart:
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 3640eb2..96fd03a 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -20,7 +20,7 @@ import math
import time
from contextlib import closing
from datetime import datetime
-from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Iterable, Optional, Tuple, cast
import pendulum
import tenacity
@@ -28,9 +28,9 @@ from kubernetes import client, watch
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
-from pendulum import Date, DateTime, Duration, Time
+from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
-from requests.exceptions import BaseHTTPError
+from urllib3.exceptions import HTTPError as BaseHTTPError
from airflow.exceptions import AirflowException
from airflow.kubernetes.kube_client import get_kube_client
@@ -49,7 +49,7 @@ class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""
-def should_retry_start_pod(exception: Exception) -> bool:
+def should_retry_start_pod(exception: BaseException) -> bool:
"""Check if an Exception indicates a transient error and warrants
retrying"""
if isinstance(exception, ApiException):
return exception.status == 409
@@ -180,7 +180,7 @@ class PodManager(LoggingMixin):
So the looping logic is there to let us resume following the logs.
"""
- def follow_logs(since_time: Optional[datetime] = None) ->
Optional[datetime]:
+ def follow_logs(since_time: Optional[DateTime] = None) ->
Optional[DateTime]:
"""
Tries to follow container logs until container completes.
For a long-running container, sometimes the log read may be
interrupted
@@ -198,7 +198,7 @@ class PodManager(LoggingMixin):
math.ceil((pendulum.now() -
since_time).total_seconds()) if since_time else None
),
)
- for line in logs: # type: bytes
+ for line in logs:
timestamp, message =
self.parse_log_line(line.decode('utf-8'))
self.log.info(message)
except BaseHTTPError: # Catches errors like
ProtocolError(TimeoutError).
@@ -241,7 +241,7 @@ class PodManager(LoggingMixin):
time.sleep(2)
return remote_pod
- def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time,
DateTime, Duration]], str]:
+ def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]:
"""
Parse K8s log line and returns the final state
@@ -256,7 +256,7 @@ class PodManager(LoggingMixin):
timestamp = line[:split_at]
message = line[split_at + 1 :].rstrip()
try:
- last_log_time = pendulum.parse(timestamp)
+ last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
self.log.error("Error parsing timestamp. Will continue execution
but won't update timestamp")
return None, line
@@ -275,7 +275,7 @@ class PodManager(LoggingMixin):
tail_lines: Optional[int] = None,
timestamps: bool = False,
since_seconds: Optional[int] = None,
- ) -> Iterable[str]:
+ ) -> Iterable[bytes]:
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
diff --git a/airflow/providers/tableau/operators/tableau_refresh_workbook.py
b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
index be9a548..7a1e29f 100644
--- a/airflow/providers/tableau/operators/tableau_refresh_workbook.py
+++ b/airflow/providers/tableau/operators/tableau_refresh_workbook.py
@@ -92,6 +92,6 @@ class TableauRefreshWorkbookOperator(BaseOperator):
check_interval=self.check_interval,
task_id='refresh_workbook',
dag=None,
- ).execute(context={})
+ ).execute(context=context)
return job_id