[AIRFLOW-1314] Add executor_config and tests * Added in executor_config to the task_instance table and the base_operator table
* Fix test; bump up number of examples * Fix up comments from PR * Exclude the kubernetes example dag from a test * Fix dict -> KubernetesExecutorConfig * fixed up executor_config comment and type hint Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c0920efc Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c0920efc Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c0920efc Branch: refs/heads/master Commit: c0920efc012468681cff3d3c9cfe25c7381dc976 Parents: ad4e67c Author: Grant Nicholas <[email protected]> Authored: Fri Oct 27 12:13:27 2017 -0500 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:23:06 2018 +0200 ---------------------------------------------------------------------- .../contrib/executors/kubernetes_executor.py | 67 +++++++++++++++++--- airflow/contrib/executors/mesos_executor.py | 2 +- .../kubernetes_request_factory.py | 21 ++++++ .../pod_request_factory.py | 1 + airflow/contrib/kubernetes/pod.py | 22 ++++++- .../contrib/kubernetes/worker_configuration.py | 16 +++-- .../example_dags/example_kubernetes_executor.py | 66 +++++++++++++++++++ airflow/executors/__init__.py | 20 ++++-- airflow/executors/base_executor.py | 4 +- airflow/executors/celery_executor.py | 2 +- airflow/executors/dask_executor.py | 2 +- airflow/executors/local_executor.py | 4 +- airflow/executors/sequential_executor.py | 2 +- ...7c24_add_executor_config_to_task_instance.py | 44 +++++++++++++ airflow/models.py | 10 +++ scripts/ci/kubernetes/docker/Dockerfile_zip | 20 ++++++ scripts/ci/kubernetes/docker/build.sh | 4 +- .../test_kubernetes_executor_integration.py | 8 +++ tests/jobs.py | 2 +- 19 files changed, 288 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 9675e81..1e3e319 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -19,18 +19,65 @@ import six from queue import Queue from dateutil import parser from uuid import uuid4 +import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.kube_client import get_kube_client from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration from airflow.executors.base_executor import BaseExecutor +from airflow.executors import Executors from airflow.models import TaskInstance, KubeResourceVersion from airflow.utils.state import State from airflow import configuration, settings from airflow.exceptions import AirflowConfigException +from airflow.contrib.kubernetes.pod import Pod, Resources from airflow.utils.log.logging_mixin import LoggingMixin + +class KubernetesExecutorConfig: + + def __init__(self, image=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None): + self.image = image + self.request_memory = request_memory + self.request_cpu = request_cpu + self.limit_memory = limit_memory + self.limit_cpu = limit_cpu + + def __repr__(self): + return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, limit_cpu={})".format( + KubernetesExecutorConfig.__name__, + self.image, self.request_memory, self.request_cpu, self.limit_memory,self.limit_cpu + ) + + @staticmethod + def from_dict(obj): + if obj is None: + return KubernetesExecutorConfig() + + if not isinstance(obj, dict): + raise TypeError("Cannot convert a non-dictionary object into a KubernetesExecutorConfig") + + namespaced = obj.get(Executors.KubernetesExecutor, {}) + + return KubernetesExecutorConfig( + image=namespaced.get("image", None), + request_memory=namespaced.get("request_memory", None), + request_cpu=namespaced.get("request_cpu", None), + limit_memory=namespaced.get("limit_memory", None), + limit_cpu=namespaced.get("limit_cpu", None) + ) + + def as_dict(self): + return { + "image": self.image, + "request_memory": self.request_memory, + "request_cpu": self.request_cpu, + "limit_memory": self.limit_memory, + "limit_cpu": self.limit_cpu + } + + class KubeConfig: core_section = "core" kubernetes_section = "kubernetes" @@ -219,15 +266,15 @@ class AirflowKubernetesScheduler(LoggingMixin, object): :return: """ - self.log.debug('k8s: job is {}'.format(str(next_job))) - key, command = next_job + self.log.info('k8s: job is {}'.format(str(next_job))) + key, command, kube_executor_config = next_job dag_id, task_id, execution_date = key self.log.debug("k8s: running for command {}".format(command)) self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image)) pod = self.worker_configuration.make_pod( namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), - airflow_command=command + airflow_command=command, kube_executor_config=kube_executor_config ) # the watcher will monitor pods, so we do not block. self.launcher.run_pod_async(pod) @@ -405,9 +452,13 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): self._inject_secrets() self.clear_not_launched_queued_tasks() - def execute_async(self, key, command, queue=None): - self.log.info("k8s: adding task {} with command {}".format(key, command)) - self.task_queue.put((key, command)) + + def execute_async(self, key, command, queue=None, executor_config=None): + self.log.info("k8s: adding task {} with command {} with executor_config {}".format( + key, command, executor_config + )) + kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config) + self.task_queue.put((key, command, kube_executor_config)) def sync(self): self.log.info("self.running: {}".format(self.running)) @@ -425,8 +476,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session) if not self.task_queue.empty(): - key, command = self.task_queue.get() - self.kube_scheduler.run_next((key, command)) + key, command, kube_executor_config = self.task_queue.get() + self.kube_scheduler.run_next((key, command, kube_executor_config)) def _change_state(self, key, state, pod_id): if state != State.RUNNING: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/executors/mesos_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py index c022f6a..e1919fa 100644 --- a/airflow/contrib/executors/mesos_executor.py +++ b/airflow/contrib/executors/mesos_executor.py @@ -285,7 +285,7 @@ class MesosExecutor(BaseExecutor, LoginMixin): self.mesos_driver = driver self.mesos_driver.start() - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, executor_config=None): self.task_queue.put((key, command)) def sync(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 9cfd77f..67ff15c 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -127,6 +127,27 @@ class KubernetesRequestFactoryHelper(object): req['spec']['containers'][0]['env'] = env @staticmethod + def extract_resources(pod, req): + if not pod.resources or pod.resources.is_empty_resource_request(): + return + + req['spec']['containers'][0]['resources'] = {} + + if pod.resources.has_requests(): + req['spec']['containers'][0]['resources']['requests'] = {} + if pod.resources.request_memory: + req['spec']['containers'][0]['resources']['requests']['memory'] = pod.resources.request_memory + if pod.resources.request_cpu: + req['spec']['containers'][0]['resources']['requests']['cpu'] = pod.resources.request_cpu + + if pod.resources.has_limits(): + req['spec']['containers'][0]['resources']['limits'] = {} + if pod.resources.request_memory: + req['spec']['containers'][0]['resources']['limits']['memory'] = pod.resources.limit_memory + if pod.resources.request_cpu: + req['spec']['containers'][0]['resources']['limits']['cpu'] = pod.resources.limit_cpu + + @staticmethod def extract_init_containers(pod, req): if pod.init_containers: req['spec']['initContainers'] = pod.init_containers http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index dfa247f..2b1756a 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -52,6 +52,7 @@ spec: self.extract_volume_secrets(pod, req) self.attach_volumes(pod, req) self.attach_volume_mounts(pod, req) + self.extract_resources(pod, req) self.extract_service_account_name(pod, req) self.extract_init_containers(pod, req) self.extract_image_pull_secrets(pod, req) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index be99bbf..56a3114 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -13,6 +13,23 @@ # limitations under the License. +class Resources: + def __init__(self, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None): + self.request_memory = request_memory + self.request_cpu = request_cpu + self.limit_memory = limit_memory + self.limit_cpu = limit_cpu + + def is_empty_resource_request(self): + return not self.has_limits() and not self.has_requests() + + def has_limits(self): + return self.limit_cpu is not None or self.limit_memory is not None + + def has_requests(self): + return self.request_cpu is not None or self.request_memory is not None + + class Pod: """ Represents a kubernetes pod and manages execution of a single pod. @@ -46,7 +63,9 @@ class Pod: image_pull_policy="IfNotPresent", image_pull_secrets=None, init_containers=None, - service_account_name=None): + service_account_name=None, + resources=None + ): self.image = image self.envs = envs if envs else {} self.cmds = cmds @@ -61,3 +80,4 @@ class Pod: self.image_pull_secrets = image_pull_secrets self.init_containers = init_containers self.service_account_name = service_account_name + self.resources = resources or Resources() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 5e87941..f59576a 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -16,7 +16,7 @@ import copy import os import six -from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.pod import Pod, Resources from airflow.contrib.kubernetes.secret import Secret @@ -133,13 +133,20 @@ class WorkerConfiguration: return [] return self.kube_config.image_pull_secrets.split(',') - def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command): + def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command, kube_executor_config): volumes, volume_mounts = self._get_volumes_and_mounts() worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts)) + resources = Resources( + request_memory=kube_executor_config.request_memory, + request_cpu=kube_executor_config.request_cpu, + limit_memory=kube_executor_config.limit_memory, + limit_cpu=kube_executor_config.limit_cpu + ) + return Pod( namespace=namespace, name=pod_id, - image=self.kube_config.kube_image, + image=kube_executor_config.image or self.kube_config.kube_image, cmds=["bash", "-cx", "--"], args=[airflow_command], labels={ @@ -154,5 +161,6 @@ class WorkerConfiguration: image_pull_secrets=self.kube_config.image_pull_secrets, init_containers=worker_init_container_spec, volumes=volumes, - volume_mounts=volume_mounts + volume_mounts=volume_mounts, + resources=resources ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/example_dags/example_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py new file mode 100644 index 0000000..31bc2fb --- /dev/null +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function +import airflow +from airflow.operators.python_operator import PythonOperator +from airflow.models import DAG +from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig +import os + + +args = { + 'owner': 'airflow', + 'start_date': airflow.utils.dates.days_ago(2) +} + +dag = DAG( + dag_id='example_kubernetes_executor', default_args=args, + schedule_interval=None +) + + +def print_stuff(): + print("stuff!") + + +def use_zip_binary(): + rc = os.system("zip") + assert rc == 0 + + + +# You don't have to use any special KubernetesExecutor configuration if you don't want to +start_task = PythonOperator( + task_id="start_task", python_callable=print_stuff, dag=dag +) + +# But you can if you want to +one_task = PythonOperator( + task_id="one_task", python_callable=print_stuff, dag=dag, + executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}} +) + +# Use the zip binary, which is only found in this special docker image +two_task = PythonOperator( + task_id="two_task", python_callable=use_zip_binary, dag=dag, + executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}} +) + +# Limit resources on this operator/task +three_task = PythonOperator( + task_id="three_task", python_callable=print_stuff, dag=dag, + executor_config={"KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} +) + +start_task.set_downstream([one_task, two_task, three_task]) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index e8d1c32..047da6f 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -50,6 +50,14 @@ def GetDefaultExecutor(): return DEFAULT_EXECUTOR +class Executors: + LocalExecutor = "LocalExecutor" + SequentialExecutor = "SequentialExecutor" + CeleryExecutor = "CeleryExecutor" + DaskExecutor = "DaskExecutor" + MesosExecutor = "MesosExecutor" + KubernetesExecutor = "KubernetesExecutor" + def _get_executor(executor_name): @@ -57,20 +65,20 @@ def _get_executor(executor_name): Creates a new instance of the named executor. In case the executor name is not know in airflow, look for it in the plugins """ - if executor_name == 'LocalExecutor': + if executor_name == Executors.LocalExecutor: return LocalExecutor() - elif executor_name == 'SequentialExecutor': + elif executor_name == Executors.SequentialExecutor: return SequentialExecutor() - elif executor_name == 'CeleryExecutor': + elif executor_name == Executors.CeleryExecutor: from airflow.executors.celery_executor import CeleryExecutor return CeleryExecutor() - elif executor_name == 'DaskExecutor': + elif executor_name == Executors.DaskExecutor: from airflow.executors.dask_executor import DaskExecutor return DaskExecutor() - elif executor_name == 'MesosExecutor': + elif executor_name == Executors.MesosExecutor: from airflow.contrib.executors.mesos_executor import MesosExecutor return MesosExecutor() - elif executor_name == 'KubernetesExecutor': + elif executor_name == Executors.KubernetesExecutor: from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor return KubernetesExecutor() else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 4515dac..1ff4c21 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -133,7 +133,7 @@ class BaseExecutor(LoggingMixin): ti.refresh_from_db() if ti.state != State.RUNNING: self.running[key] = command - self.execute_async(key, command=command, queue=queue) + self.execute_async(key, command=command, queue=queue, executor_config=ti.executor_config) else: self.logger.info( 'Task is already running, not sending to ' @@ -174,7 +174,7 @@ class BaseExecutor(LoggingMixin): return cleared_events - def execute_async(self, key, command, queue=None): # pragma: no cover + def execute_async(self, key, command, queue=None, executor_config=None): # pragma: no cover """ This method will execute the command asynchronously. """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index efabca5..70d0088 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -79,7 +79,7 @@ class CeleryExecutor(BaseExecutor): self.last_state = {} def execute_async(self, key, command, - queue=DEFAULT_CELERY_CONFIG['task_default_queue']): + queue=DEFAULT_CELERY_CONFIG['task_default_queue'], executor_config=None): self.log.info( "[celery] queuing {key} through celery, " "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 17ace55..42716ee 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -56,7 +56,7 @@ class DaskExecutor(BaseExecutor): self.client = distributed.Client(self.cluster_address, security=security) self.futures = {} - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, executor_config=None): if queue is not None: warnings.warn( 'DaskExecutor does not support queues. All tasks will be run in the same cluster' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index ed03980..4ac25f5 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -221,8 +221,8 @@ class LocalExecutor(BaseExecutor): self.impl.start() - def execute_async(self, key, command, queue=None): - self.impl.execute_async(key=key, command=command) + def execute_async(self, key, command, queue=None, executor_config=None): + self.queue.put((key, command)) def sync(self): self.impl.sync() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/executors/sequential_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index ed27109..39153b8 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -37,7 +37,7 @@ class SequentialExecutor(BaseExecutor): super(SequentialExecutor, self).__init__() self.commands_to_run = [] - def execute_async(self, key, command, queue=None): + def execute_async(self, key, command, queue=None, executor_config=None): self.commands_to_run.append((key, command,)) def sync(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py new file mode 100644 index 0000000..84c41ec --- /dev/null +++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py @@ -0,0 +1,44 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""kubernetes_resource_checkpointing + +Revision ID: 33ae817a1ff4 +Revises: 947454bf1dff +Create Date: 2017-09-11 15:26:47.598494 + +""" + +# revision identifiers, used by Alembic. +revision = '27c6a30d7c24' +down_revision = '33ae817a1ff4' +branch_labels = None +depends_on = None + + +from alembic import op +import sqlalchemy as sa +import dill + + +TASK_INSTANCE_TABLE = "task_instance" +NEW_COLUMN = "executor_config" + + +def upgrade(): + op.add_column(TASK_INSTANCE_TABLE, sa.Column(NEW_COLUMN, sa.PickleType(pickler=dill))) + + +def downgrade(): + op.drop_column(TASK_INSTANCE_TABLE, NEW_COLUMN) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d03c363..ae387b6 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -824,6 +824,7 @@ class TaskInstance(Base, LoggingMixin): operator = Column(String(1000)) queued_dttm = Column(UtcDateTime) pid = Column(Integer) + executor_config = Column(PickleType(pickler=dill)) __table_args__ = ( Index('ti_dag_state', dag_id, state), @@ -862,6 +863,7 @@ class TaskInstance(Base, LoggingMixin): if state: self.state = state self.hostname = '' + self.executor_config = task.executor_config self.init_on_load() # Is this TaskInstance being currently running within `airflow run --raw`. # Not persisted to the database so only valid for the current process @@ -1147,6 +1149,7 @@ class TaskInstance(Base, LoggingMixin): self.max_tries = ti.max_tries self.hostname = ti.hostname self.pid = ti.pid + self.executor_config = ti.executor_config else: self.state = None @@ -2220,6 +2223,11 @@ class BaseOperator(LoggingMixin): :param task_concurrency: When set, a task will be able to limit the concurrent runs across execution_dates :type task_concurrency: int + :param executor_config: Additional task-level configuration parameters that are + interpreted by a specific executor. Parameters are namespaced by the name of executor. + ``example: to run this task in a specific docker container through the KubernetesExecutor + MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})`` + :type executor_config: dict """ # For derived classes to define which fields will get jinjaified @@ -2264,6 +2272,7 @@ class BaseOperator(LoggingMixin): resources=None, run_as_user=None, task_concurrency=None, + executor_config=None, *args, **kwargs): @@ -2338,6 +2347,7 @@ class BaseOperator(LoggingMixin): self.resources = Resources(**(resources or {})) self.run_as_user = run_as_user self.task_concurrency = task_concurrency + self.executor_config = executor_config or {} # Private attributes self._upstream_task_ids = set() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/scripts/ci/kubernetes/docker/Dockerfile_zip ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/Dockerfile_zip b/scripts/ci/kubernetes/docker/Dockerfile_zip new file mode 100644 index 0000000..494b16a --- /dev/null +++ b/scripts/ci/kubernetes/docker/Dockerfile_zip @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one * +# or more contributor license agreements. See the NOTICE file * +# distributed with this work for additional information * +# regarding copyright ownership. The ASF licenses this file * +# to you under the Apache License, Version 2.0 (the * +# "License"); you may not use this file except in compliance * +# with the License. You may obtain a copy of the License at * +# * +# http://www.apache.org/licenses/LICENSE-2.0 * +# * +# Unless required by applicable law or agreed to in writing, * +# software distributed under the License is distributed on an * +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * +# KIND, either express or implied. See the License for the * +# specific language governing permissions and limitations * +# under the License. * + +FROM airflow/ci:latest + +RUN apt-get -y update && apt-get -y install zip unzip http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/scripts/ci/kubernetes/docker/build.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/kubernetes/docker/build.sh b/scripts/ci/kubernetes/docker/build.sh index d36ea86..7f0fe61 100755 --- a/scripts/ci/kubernetes/docker/build.sh +++ b/scripts/ci/kubernetes/docker/build.sh @@ -26,4 +26,6 @@ if [ $? -eq 0 ]; then fi cd $AIRFLOW_ROOT && python setup.py sdist && cp $AIRFLOW_ROOT/dist/*.tar.gz $DIRNAME/airflow.tar.gz && \ -cd $DIRNAME && docker build $DIRNAME --tag=${IMAGE}:${TAG} +cd $DIRNAME && \ +docker build -f Dockerfile $DIRNAME --tag=${IMAGE}:${TAG} && \ +docker build -f Dockerfile_zip $DIRNAME --tag=${IMAGE}_zip:${TAG} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/tests/contrib/executors/integration/test_kubernetes_executor_integration.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py index 709ae6a..97949ae 100644 --- a/tests/contrib/executors/integration/test_kubernetes_executor_integration.py +++ b/tests/contrib/executors/integration/test_kubernetes_executor_integration.py @@ -52,6 +52,14 @@ class KubernetesExecutorTest(unittest.TestCase): self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS) + @unittest.skipIf(SKIP_KUBE, 'Kubernetes integration tests are unsupported by this configuration') + def test_kubernetes_executor_config_works(self): + dag_id, run_id = "example_kubernetes_executor", uuid4().hex + run_dag(dag_id, run_id) + + self.assertEquals(get_dag_run_state(dag_id, run_id), DagRunState.RUNNING) + self.assertEquals(dag_final_state(dag_id, run_id, timeout=180), DagRunState.SUCCESS) + if __name__ == "__main__": unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0920efc/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index d6336f0..615ca9a 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -161,7 +161,7 @@ class BackfillJobTest(unittest.TestCase): 'example_trigger_target_dag', 'example_trigger_controller_dag', # tested above 'test_utils', # sleeps forever - 'example_kubernetes_operator', # only works with k8s cluster + 'example_kubernetes_executor' # requires kubernetes cluster ] logger = logging.getLogger('BackfillJobTest.test_backfill_examples')
