[AIRFLOW-1314] Small cleanup to address PR comments (#24) * Small cleanup to address PR comments
* Remove use of enum * Change back to 3.4 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/309f764a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/309f764a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/309f764a Branch: refs/heads/master Commit: 309f764aa393a78303bd97b9fc2b985e93aac332 Parents: c0920ef Author: Benjamin Goldberg <[email protected]> Authored: Fri Oct 27 15:43:34 2017 -0500 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:23:06 2018 +0200 ---------------------------------------------------------------------- .travis.yml | 2 +- .../contrib/executors/kubernetes_executor.py | 103 ++++++++++++------- airflow/contrib/kubernetes/kube_client.py | 3 +- .../kubernetes_request_factory.py | 16 +-- .../pod_request_factory.py | 5 +- airflow/contrib/kubernetes/pod.py | 24 ++--- airflow/contrib/kubernetes/pod_launcher.py | 21 ++-- .../operators/kubernetes/pod_operator.py | 38 +++---- airflow/executors/base_executor.py | 2 +- .../executors/test_kubernetes_executor.py | 11 +- 10 files changed, 131 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index ec2d44d..6fd2d50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -86,7 +86,7 @@ matrix: - python: "2.7" env: TOX_ENV=py34-hdp-airflow_backend_postgres - python: "3.4" - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true + env: TOX_ENV=py34-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true allow_failures: - env: TOX_ENV=py27-cdh-airflow_backend_postgres RUN_KUBE_INTEGRATION=true cache: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 1e3e319..a5aa1e1 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -109,7 +109,8 @@ class KubeConfig: self.kubernetes_section, 'worker_container_tag') self.kube_image = '{}:{}'.format( self.worker_container_repository, self.worker_container_tag) - self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True) + self.delete_worker_pods = self.safe_getboolean( + self.kubernetes_section, 'delete_worker_pods', True) self.worker_service_account_name = self.safe_get( self.kubernetes_section, 'worker_service_account_name', 'default') @@ -132,7 +133,8 @@ class KubeConfig: self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None) # This prop may optionally be set for PV Claims and is used to locate DAGs on a SubPath - self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath', None) + self.dags_volume_subpath = self.safe_get( + self.kubernetes_section, 'dags_volume_subpath', None) # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note that if your # cluster has RBAC enabled, your scheduler may need service account permissions to @@ -143,7 +145,8 @@ class KubeConfig: # interact with cluster components. self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default') # Task secrets managed by KubernetesExecutor. - self.gcp_service_account_keys = self.safe_get(self.kubernetes_section, 'gcp_service_account_keys', None) + self.gcp_service_account_keys = self.safe_get( + self.kubernetes_section, 'gcp_service_account_keys', None) # If the user is using the git-sync container to clone their repository via git, # allow them to specify repository, tag, and pod name for the init container. @@ -167,10 +170,8 @@ class KubeConfig: def _validate(self): if not self.dags_volume_claim and (not self.git_repo or not self.git_branch): raise AirflowConfigException( - "In kubernetes mode you must set the following configs in the `kubernetes` section: " - "`dags_volume_claim` or " - "`git_repo and git_branch` " - ) + "In kubernetes mode the following must be set in the `kubernetes` config section: " + "`dags_volume_claim` or `git_repo and git_branch` ") class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): @@ -193,7 +194,9 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): "last resource_version: {}".format(self.resource_version)) def _run(self, kube_client, resource_version): - self.log.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version)) + self.log.info( + "Event: and now my watch begins starting at resource_version: {}" + .format(resource_version)) watcher = watch.Watch() kwargs = {"label_selector": "airflow-slave"} @@ -203,9 +206,11 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): last_resource_version = None for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs): task = event['object'] - self.log.info("Event: {} had an event of type {}".format(task.metadata.name, event['type'])) + self.log.info( + "Event: {} had an event of type {}".format(task.metadata.name, event['type'])) self.process_status( - task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version + task.metadata.name, task.status.phase, task.metadata.labels, + task.metadata.resource_version ) last_resource_version = task.metadata.resource_version @@ -224,7 +229,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): self.log.info("Event: {} is Running".format(pod_id)) else: self.log.warn("Event: Invalid state: {} on pod: {} with labels: {} " - "with resource_version: {}".format(status, pod_id, labels, resource_version)) + "with resource_version: {}" + .format(status, pod_id, labels, resource_version)) class AirflowKubernetesScheduler(LoggingMixin, object): @@ -252,7 +258,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object): if self.kube_watcher.is_alive(): pass else: - self.log.error("Error while health checking kube watcher process. Process died for unknown reasons") + self.log.error( + "Error while health checking kube watcher process. " + "Process died for unknown reasons") self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job): @@ -262,9 +270,6 @@ class AirflowKubernetesScheduler(LoggingMixin, object): It will then create a unique job-id, launch that job in the cluster, and store relevent info in the current_jobs map so we can track the job's status - - :return: - """ self.log.info('k8s: job is {}'.format(str(next_job))) key, command, kube_executor_config = next_job @@ -273,7 +278,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image)) pod = self.worker_configuration.make_pod( namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), - dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), + dag_id=dag_id, task_id=task_id, + execution_date=self._datetime_to_label_safe_datestring(execution_date), airflow_command=command, kube_executor_config=kube_executor_config ) # the watcher will monitor pods, so we do not block. @@ -283,7 +289,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): def delete_pod(self, pod_id): if self.kube_config.delete_worker_pods: try: - self.kube_client.delete_namespaced_pod(pod_id, self.namespace, body=client.V1DeleteOptions()) + self.kube_client.delete_namespaced_pod( + pod_id, self.namespace, body=client.V1DeleteOptions()) except ApiException as e: if e.status != 404: raise @@ -304,7 +311,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object): def process_watcher_task(self): pod_id, state, labels, resource_version = self.watcher_queue.get() - self.log.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels)) + self.log.info( + "Attempting to finish pod; pod_id: {}; state: {}; labels: {}" + .format(pod_id, state, labels)) key = self._labels_to_key(labels) if key: self.log.debug("finishing job {} - {} ({})".format(key, state, pod_id)) @@ -314,10 +323,12 @@ class AirflowKubernetesScheduler(LoggingMixin, object): def _strip_unsafe_kubernetes_special_chars(string): """ Kubernetes only supports lowercase alphanumeric characters and "-" and "." in the pod name - However, there are special rules about how "-" and "." can be used so let's only keep alphanumeric chars - see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ - :param string: - :return: + However, there are special rules about how "-" and "." can be used so let's only keep + alphanumeric chars see here for detail: + https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ + + :param string: The requested Pod name + :return: ``str`` Pod name stripped of any unsafe characters """ return ''.join(ch.lower() for ind, ch in enumerate(string) if ch.isalnum()) @@ -326,10 +337,11 @@ class AirflowKubernetesScheduler(LoggingMixin, object): """ Kubernetes pod names must be <= 253 chars and must pass the following regex for validation "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" + :param safe_dag_id: a dag_id with only alphanumeric characters :param safe_task_id: a task_id with only alphanumeric characters :param random_uuid: a uuid - :return: + :return: ``str`` valid Pod name of appropriate length """ MAX_POD_ID_LEN = 253 @@ -349,7 +361,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object): @staticmethod def _label_safe_datestring_to_datetime(string): """ - Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" + Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not "_", let's + replace ":" with "_" + :param string: string :return: datetime.datetime object """ @@ -358,7 +372,8 @@ class AirflowKubernetesScheduler(LoggingMixin, object): @staticmethod def _datetime_to_label_safe_datestring(datetime_obj): """ - Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's + replace ":" with "_" :param datetime_obj: datetime.datetime object :return: ISO-like string representing the datetime """ @@ -366,7 +381,9 @@ class AirflowKubernetesScheduler(LoggingMixin, object): def _labels_to_key(self, labels): try: - return labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"]) + return ( + labels["dag_id"], labels["task_id"], + self._label_safe_datestring_to_datetime(labels["execution_date"])) except Exception as e: self.log.warn("Error while converting labels to key; labels: {}; exception: {}".format( labels, e @@ -386,23 +403,32 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): def clear_not_launched_queued_tasks(self): """ - If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not have been launched - Thus, on starting up the scheduler let's check every "Queued" task to see if it has been launched - (ie: if there is a corresponding pod on kubernetes) - If it has been launched then do nothing, otherwise reset the state to "None" so the task will be rescheduled - This will not be necessary in a future version of airflow in which there is proper support for State.LAUNCHED - :return: None + If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or may not + have been launched Thus, on starting up the scheduler let's check every "Queued" task to + see if it has been launched (ie: if there is a corresponding pod on kubernetes) + + If it has been launched then do nothing, otherwise reset the state to "None" so the task + will be rescheduled + + This will not be necessary in a future version of airflow in which there is proper support + for State.LAUNCHED """ - queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all() - self.log.info("When executor started up, found {} queued task instances".format(len(queued_tasks))) + queued_tasks = self._session.query( + TaskInstance).filter(TaskInstance.state == State.QUEUED).all() + self.log.info( + "When executor started up, found {} queued task instances".format(len(queued_tasks))) for t in queued_tasks: kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format( - t.dag_id, t.task_id, AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date) + t.dag_id, t.task_id, + AirflowKubernetesScheduler._datetime_to_label_safe_datestring(t.execution_date) )) - pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs) + pod_list = self.kube_client.list_namespaced_pod( + self.kube_config.kube_namespace, **kwargs) if len(pod_list.items) == 0: - self.log.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t)) + self.log.info( + "TaskInstance: {} found in queued state but was not launched, rescheduling" + .format(t)) self._session.query(TaskInstance).filter( TaskInstance.dag_id == t.dag_id, TaskInstance.task_id == t.task_id, @@ -473,7 +499,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): self.log.info("Changing state of {}".format(results)) self._change_state(key, state, pod_id) - KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session) + KubeResourceVersion.checkpoint_resource_version( + last_resource_version, session=self._session) if not self.task_queue.empty(): key, command, kube_executor_config = self.task_queue.get() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index b01e14d..9603963 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -22,4 +22,5 @@ def get_kube_client(in_cluster=True): config.load_incluster_config() return client.CoreV1Api() else: - NotImplementedError("Running kubernetes jobs from not within the cluster is not supported at this time") + NotImplementedError( + "Running kubernetes jobs from not within the cluster is not supported at this time") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 67ff15c..b5ab074 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -17,19 +17,18 @@ import six class KubernetesRequestFactory(): """ - Create requests to be sent to kube API. Extend this class - to talk to kubernetes and generate your specific resources. - This is equivalent of generating yaml files that can be used - by `kubectl` + Create requests to be sent to kube API. Extend this class to talk to kubernetes and generate + your specific resources. This is equivalent of generating yaml files that can be used by + `kubectl` """ __metaclass__ = ABCMeta @abstractmethod def create(self, pod): """ - Creates the request for kubernetes API. + Creates the request for kubernetes API. - :param pod: The pod object + :param pod: The pod object """ pass @@ -89,7 +88,10 @@ class KubernetesRequestFactoryHelper(object): @staticmethod def attach_volume_mounts(pod, req): - req['spec']['volumes'] = pod.volumes + if len(pod.volume_mounts) > 0: + req['spec']['containers'][0]['volumeMounts'] = ( + req['spec']['containers'][0].get('volumeMounts', [])) + req['spec']['containers'][0]['volumeMounts'].extend(pod.volume_mounts) @staticmethod def extract_name(pod, req): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 2b1756a..ea6b94b 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -12,13 +12,14 @@ # See the License for the specific language governing permissions and import yaml -from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory +from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import ( + KubernetesRequestFactory) from airflow.contrib.kubernetes.pod import Pod class SimplePodRequestFactory(KubernetesRequestFactory): """ - Request generator for a simple pod. + Request generator for a simple pod. """ _yaml = """apiVersion: v1 kind: Pod http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 56a3114..1877da7 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -32,19 +32,19 @@ class Resources: class Pod: """ - Represents a kubernetes pod and manages execution of a single pod. - :param image: The docker image - :type image: str - :param env: A dict containing the environment variables - :type env: dict - :param cmds: The command to be run on the pod - :type cmd: list str - :param secrets: Secrets to be launched to the pod - :type secrets: list Secret - :param result: The result that will be returned to the operator after - successful execution of the pod - :type result: any + Represents a kubernetes pod and manages execution of a single pod. + :param image: The docker image + :type image: str + :param env: A dict containing the environment variables + :type env: dict + :param cmds: The command to be run on the pod + :type cmd: list str + :param secrets: Secrets to be launched to the pod + :type secrets: list Secret + :param result: The result that will be returned to the operator after + successful execution of the pod + :type result: any """ pod_timeout = 3600 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index 1fcdb10..1903060 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -11,10 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json from airflow.contrib.kubernetes.pod import Pod -from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory +from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import ( + SimplePodRequestFactory) from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State from kubernetes import watch @@ -24,6 +26,13 @@ from kubernetes.client.rest import ApiException from .kube_client import get_kube_client +class PodStatus(object): + PENDING = 'pending' + RUNNING = 'running' + FAILED = 'failed' + SUCCEEDED = 'succeeded' + + class PodLauncher(LoggingMixin): def __init__(self, kube_client=None): self.kube_req_factory = SimplePodRequestFactory() @@ -44,7 +53,7 @@ class PodLauncher(LoggingMixin): def run_pod(self, pod): # type: (Pod) -> State """ - Launches the pod synchronously and waits for completion. + Launches the pod synchronously and waits for completion. """ resp = self.run_pod_async(pod) final_status = self._monitor_pod(pod) @@ -70,15 +79,15 @@ class PodLauncher(LoggingMixin): return self._client.read_namespaced_pod(pod.name, pod.namespace) def process_status(self, job_id, status): - if status == 'Pending': + if status == PodStatus.PENDING: return State.QUEUED - elif status == 'Failed': + elif status == PodStatus.FAILED: self.log.info("Event: {} Failed".format(job_id)) return State.FAILED - elif status == 'Succeeded': + elif status == PodStatus.SUCCEEDED: self.log.info("Event: {} Succeeded".format(job_id)) return State.SUCCESS - elif status == 'Running': + elif status == PodStatus.RUNNING: return State.RUNNING else: self.log.info("Event: Invalid state {} on job {}".format(status, job_id)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/contrib/operators/kubernetes/pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes/pod_operator.py b/airflow/contrib/operators/kubernetes/pod_operator.py index 8b7a55f..0db8c6d 100644 --- a/airflow/contrib/operators/kubernetes/pod_operator.py +++ b/airflow/contrib/operators/kubernetes/pod_operator.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.exceptions import AirflowException from airflow.operators.python_operator import PythonOperator from airflow.utils.decorators import apply_defaults @@ -24,16 +22,16 @@ from airflow.utils.state import State class PodOperator(PythonOperator): """ - Executes a pod and waits for the job to finish. - :param dag_run_id: The unique run ID that would be attached to the pod as a label - :type dag_run_id: str - :param pod_factory: Reference to the function that creates the pod with format: - function (OpContext) => Pod - :type pod_factory: callable - :param cache_output: If set to true, the output of the pod would be saved in a - cache object using md5 hash of all the pod parameters - and in case of success, the cached results will be returned - on consecutive calls. Only use this + Executes a pod and waits for the job to finish. + + :param dag_run_id: The unique run ID that would be attached to the pod as a label + :type dag_run_id: str + :param pod_factory: Reference to the function that creates the pod with format: + function (OpContext) => Pod + :type pod_factory: callable + :param cache_output: If set to true, the output of the pod would be saved in a + cache object using md5 hash of all the pod parameters and in case of success, the cached + results will be returned on consecutive calls. Only use this """ # template_fields = tuple('dag_run_id') ui_color = '#8da7be' @@ -56,7 +54,6 @@ class PodOperator(PythonOperator): provide_context=True, *args, **kwargs) - self.logger = logging.getLogger(self.__class__.__name__) self.pod = pod self.dag_run_id = dag_run_id self.pod_launcher = PodLauncher() @@ -92,13 +89,12 @@ class PodOperator(PythonOperator): def on_pod_success(self, context): """ - Called when pod is executed successfully. - - If you want to access return values for XCOM, place values - in accessible file system or DB and override this function. - - :return: Returns a custom return value for pod which will - be stored in xcom - + Called when pod is executed successfully. + + If you want to access return values for XCOM, place values + in accessible file system or DB and override this function. + + :return: Returns a custom return value for pod which will + be stored in xcom """ return self._on_pod_success_func(context=context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 1ff4c21..7f00e93 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -53,7 +53,7 @@ class BaseExecutor(LoggingMixin): self.log.info("Adding to queue: %s", command) self.queued_tasks[key] = (command, priority, queue, task_instance) else: - self.logger.info("could not queue task {}".format(key)) + self.log.info("could not queue task {}".format(key)) def queue_task_instance( self, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/309f764a/tests/contrib/executors/test_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index a60489e..4c9728e 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -48,15 +48,16 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): def _is_valid_name(self, name): regex = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" - return len(name) <= 253 and \ - all(ch.lower() == ch for ch in name) and \ - re.match(regex, name) + return ( + len(name) <= 253 and + all(ch.lower() == ch for ch in name) and + re.match(regex, name)) @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') def test_create_pod_id(self): for dag_id, task_id in self._cases(): pod_name = AirflowKubernetesScheduler._create_pod_id(dag_id, task_id) - assert self._is_valid_name(pod_name) + self.assertTrue(self._is_valid_name(pod_name)) @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed") def test_execution_date_serialize_deserialize(self): @@ -64,7 +65,7 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): serialized_datetime = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(datetime_obj) new_datetime_obj = AirflowKubernetesScheduler._label_safe_datestring_to_datetime(serialized_datetime) - assert datetime_obj == new_datetime_obj + self.assertEquals(datetime_obj, new_datetime_obj) if __name__ == '__main__':
