This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 5f7a55c  Fix Flake8 tests for monitor pods by label
5f7a55c is described below

commit 5f7a55c19c62566ee9357c31d9069d2b01bfc126
Author: Daniel Imberman <[email protected]>
AuthorDate: Tue Jun 23 14:33:23 2020 -0700

    Fix Flake8 tests for monitor pods by label
---
 airflow/executors/kubernetes_executor.py           | 24 +++++++++-------------
 airflow/kubernetes/pod_generator.py                |  6 +-----
 airflow/kubernetes/pod_launcher.py                 |  3 +--
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  5 ++---
 kubernetes_tests/test_kubernetes_pod_operator.py   |  9 +++-----
 tests/executors/test_kubernetes_executor.py        |  3 ---
 6 files changed, 17 insertions(+), 33 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 2036f4f..73c12ce 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -16,36 +16,32 @@
 # under the License.
 """Kubernetes executor"""
 import base64
-import hashlib
-from queue import Empty
-
-import re
 import json
 import multiprocessing
-from uuid import uuid4
 import time
-
-from dateutil import parser
+from queue import Empty
+from uuid import uuid4
 
 import kubernetes
+from dateutil import parser
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
 from urllib3.exceptions import HTTPError, ReadTimeoutError
 
+from airflow import settings
 from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.executors.base_executor import BaseExecutor
 from airflow.kubernetes import pod_generator
-from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
-from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
-from airflow.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN
 from airflow.kubernetes.pod_generator import PodGenerator
-from airflow.executors.base_executor import BaseExecutor
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.worker_configuration import WorkerConfiguration
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, 
TaskInstance
-from airflow.utils.state import State
 from airflow.utils.db import provide_session, create_session
-from airflow import settings
-from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
 
 MAX_LABEL_LEN = 63
 
diff --git a/airflow/kubernetes/pod_generator.py 
b/airflow/kubernetes/pod_generator.py
index e92918e..f6d455a 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -23,16 +23,12 @@ is supported and no serialization need be written.
 
 import copy
 import hashlib
-import inspect
-import os
 import re
 import uuid
-from functools import reduce
-from typing import Dict, List, Optional, Union
 
 import kubernetes.client.models as k8s
+
 from airflow.executors import Executors
-import uuid
 
 MAX_LABEL_LEN = 63
 
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index 62360c5..6fb5989 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -110,7 +110,6 @@ class PodLauncher(LoggingMixin):
                 time.sleep(1)
             self.log.debug('Pod not yet started')
 
-
     def monitor_pod(self, pod, get_logs):
         """
         :param pod: pod spec that will be monitored
@@ -205,7 +204,7 @@ class PodLauncher(LoggingMixin):
         wait=tenacity.wait_exponential(),
         reraise=True
     )
-    def read_pod(self, pod: V1Pod):
+    def read_pod(self, pod):
         """Read POD information"""
         try:
             return self._client.read_namespaced_pod(pod.metadata.name, 
pod.metadata.namespace)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 6742309..49d9a9f 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executes task in a Kubernetes POD"""
-import warnings
 
 import re
 
@@ -176,7 +175,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                  **kwargs):
         if kwargs.get('xcom_push') is not None:
             raise AirflowException("'xcom_push' was deprecated, use 
'do_xcom_push' instead")
-        super().__init__(*args, resources=None, **kwargs)
+        super(KubernetesPodOperator, self).__init__(*args, resources=None, 
**kwargs)
 
         self.pod = None
         self.do_xcom_push = do_xcom_push
@@ -219,7 +218,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         self.name = self._set_name(name)
 
     @staticmethod
-    def create_labels_for_pod(context) -> dict:
+    def create_labels_for_pod(context):
         """
         Generate labels for the pod to track the pod in case of Operator crash
 
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index e737ce5..ddd6dce 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -21,15 +21,12 @@ import os
 import shutil
 import unittest
 
-from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
-from tests.compat import mock, patch
-
 import kubernetes.client.models as k8s
 import pendulum
-import pytest
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import kube_client
 from airflow.kubernetes.pod import Port
@@ -39,9 +36,9 @@ from airflow.kubernetes.secret import Secret
 from airflow.kubernetes.volume import Volume
 from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.models import DAG, TaskInstance
-
 from airflow.utils import timezone
 from airflow.version import version as airflow_version
+from tests.compat import mock, patch
 
 
 # noinspection DuplicatedCode
@@ -109,7 +106,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             }
         }
 
-    def tearDown(self) -> None:
+    def tearDown(self):
         client = kube_client.get_kube_client(in_cluster=False)
         client.delete_collection_namespaced_pod(namespace="default")
 
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 4bb6fcf..cf7ba54 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -33,7 +33,6 @@ try:
     from airflow.configuration import conf  # noqa: F401
     from airflow.executors.kubernetes_executor import 
AirflowKubernetesScheduler
     from airflow.executors.kubernetes_executor import KubernetesExecutor
-    from airflow.executors.kubernetes_executor import KubeConfig
     from airflow.kubernetes import pod_generator
     from airflow.kubernetes.pod_generator import PodGenerator
     from airflow.utils.state import State
@@ -101,12 +100,10 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
             self.assertTrue(self._is_safe_label_value(safe_dag_id))
             safe_task_id = pod_generator.make_safe_label_value(task_id)
             self.assertTrue(self._is_safe_label_value(safe_task_id))
-            id = "my_dag_id"
             self.assertEqual(
                 dag_id,
                 pod_generator.make_safe_label_value(dag_id)
             )
-            id = "my_dag_id_" + "a" * 64
             self.assertEqual(
                 "my_dag_id_" + "a" * 43 + "-0ce114c45",
                 pod_generator.make_safe_label_value(dag_id)

Reply via email to