This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 5f7a55c Fix Flake8 tests for monitor pods by label
5f7a55c is described below
commit 5f7a55c19c62566ee9357c31d9069d2b01bfc126
Author: Daniel Imberman <[email protected]>
AuthorDate: Tue Jun 23 14:33:23 2020 -0700
Fix Flake8 tests for monitor pods by label
---
airflow/executors/kubernetes_executor.py | 24 +++++++++-------------
airflow/kubernetes/pod_generator.py | 6 +-----
airflow/kubernetes/pod_launcher.py | 3 +--
.../cncf/kubernetes/operators/kubernetes_pod.py | 5 ++---
kubernetes_tests/test_kubernetes_pod_operator.py | 9 +++-----
tests/executors/test_kubernetes_executor.py | 3 ---
6 files changed, 17 insertions(+), 33 deletions(-)
diff --git a/airflow/executors/kubernetes_executor.py
b/airflow/executors/kubernetes_executor.py
index 2036f4f..73c12ce 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -16,36 +16,32 @@
# under the License.
"""Kubernetes executor"""
import base64
-import hashlib
-from queue import Empty
-
-import re
import json
import multiprocessing
-from uuid import uuid4
import time
-
-from dateutil import parser
+from queue import Empty
+from uuid import uuid4
import kubernetes
+from dateutil import parser
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from urllib3.exceptions import HTTPError, ReadTimeoutError
+from airflow import settings
from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.executors.base_executor import BaseExecutor
from airflow.kubernetes import pod_generator
-from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
-from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.kube_client import get_kube_client
-from airflow.kubernetes.worker_configuration import WorkerConfiguration
+from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN
from airflow.kubernetes.pod_generator import PodGenerator
-from airflow.executors.base_executor import BaseExecutor
+from airflow.kubernetes.pod_launcher import PodLauncher
+from airflow.kubernetes.worker_configuration import WorkerConfiguration
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier,
TaskInstance
-from airflow.utils.state import State
from airflow.utils.db import provide_session, create_session
-from airflow import settings
-from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.state import State
MAX_LABEL_LEN = 63
diff --git a/airflow/kubernetes/pod_generator.py
b/airflow/kubernetes/pod_generator.py
index e92918e..f6d455a 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -23,16 +23,12 @@ is supported and no serialization need be written.
import copy
import hashlib
-import inspect
-import os
import re
import uuid
-from functools import reduce
-from typing import Dict, List, Optional, Union
import kubernetes.client.models as k8s
+
from airflow.executors import Executors
-import uuid
MAX_LABEL_LEN = 63
diff --git a/airflow/kubernetes/pod_launcher.py
b/airflow/kubernetes/pod_launcher.py
index 62360c5..6fb5989 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -110,7 +110,6 @@ class PodLauncher(LoggingMixin):
time.sleep(1)
self.log.debug('Pod not yet started')
-
def monitor_pod(self, pod, get_logs):
"""
:param pod: pod spec that will be monitored
@@ -205,7 +204,7 @@ class PodLauncher(LoggingMixin):
wait=tenacity.wait_exponential(),
reraise=True
)
- def read_pod(self, pod: V1Pod):
+ def read_pod(self, pod):
"""Read POD information"""
try:
return self._client.read_namespaced_pod(pod.metadata.name,
pod.metadata.namespace)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 6742309..49d9a9f 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
-import warnings
import re
@@ -176,7 +175,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
**kwargs):
if kwargs.get('xcom_push') is not None:
raise AirflowException("'xcom_push' was deprecated, use
'do_xcom_push' instead")
- super().__init__(*args, resources=None, **kwargs)
+ super(KubernetesPodOperator, self).__init__(*args, resources=None,
**kwargs)
self.pod = None
self.do_xcom_push = do_xcom_push
@@ -219,7 +218,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
self.name = self._set_name(name)
@staticmethod
- def create_labels_for_pod(context) -> dict:
+ def create_labels_for_pod(context):
"""
Generate labels for the pod to track the pod in case of Operator crash
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index e737ce5..ddd6dce 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -21,15 +21,12 @@ import os
import shutil
import unittest
-from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
-from tests.compat import mock, patch
-
import kubernetes.client.models as k8s
import pendulum
-import pytest
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
+from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client
from airflow.kubernetes.pod import Port
@@ -39,9 +36,9 @@ from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
from airflow.models import DAG, TaskInstance
-
from airflow.utils import timezone
from airflow.version import version as airflow_version
+from tests.compat import mock, patch
# noinspection DuplicatedCode
@@ -109,7 +106,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
}
}
- def tearDown(self) -> None:
+ def tearDown(self):
client = kube_client.get_kube_client(in_cluster=False)
client.delete_collection_namespaced_pod(namespace="default")
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index 4bb6fcf..cf7ba54 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -33,7 +33,6 @@ try:
from airflow.configuration import conf # noqa: F401
from airflow.executors.kubernetes_executor import
AirflowKubernetesScheduler
from airflow.executors.kubernetes_executor import KubernetesExecutor
- from airflow.executors.kubernetes_executor import KubeConfig
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.utils.state import State
@@ -101,12 +100,10 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
self.assertTrue(self._is_safe_label_value(safe_dag_id))
safe_task_id = pod_generator.make_safe_label_value(task_id)
self.assertTrue(self._is_safe_label_value(safe_task_id))
- id = "my_dag_id"
self.assertEqual(
dag_id,
pod_generator.make_safe_label_value(dag_id)
)
- id = "my_dag_id_" + "a" * 64
self.assertEqual(
"my_dag_id_" + "a" * 43 + "-0ce114c45",
pod_generator.make_safe_label_value(dag_id)