Repository: incubator-airflow Updated Branches: refs/heads/master 4be1ffeec -> 4cfebd8ae
[AIRFLOW-2712] Pass annotations to KubernetesExecutorConfig Closes #3576 from tfpk/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cfebd8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cfebd8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cfebd8a Branch: refs/heads/master Commit: 4cfebd8aed6acd64b34daa983413f60552e671ec Parents: 4be1ffe Author: Tom Kunc <[email protected]> Authored: Tue Jul 24 01:12:09 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Tue Jul 24 01:12:09 2018 +0100 ---------------------------------------------------------------------- .../contrib/executors/kubernetes_executor.py | 14 +++--- .../contrib/kubernetes/worker_configuration.py | 6 +-- .../example_kubernetes_annotation.py | 47 ++++++++++++++++++++ .../minikube/test_kubernetes_executor.py | 18 ++++---- tests/core.py | 2 +- 5 files changed, 70 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 4ea52c4..788d925 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -39,7 +39,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, - gcp_service_account_key=None, node_selectors=None, affinity=None): + gcp_service_account_key=None, node_selectors=None, affinity=None, + annotations=None): self.image = image self.image_pull_policy = image_pull_policy self.request_memory = request_memory @@ -49,15 +50,16 @@ class KubernetesExecutorConfig: self.gcp_service_account_key = gcp_service_account_key self.node_selectors = node_selectors self.affinity = affinity + self.annotations = annotations def __repr__(self): return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \ "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \ - "node_selectors={}, affinity={})" \ + "node_selectors={}, affinity={}, annotations={})" \ .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy, self.request_memory, self.request_cpu, self.limit_memory, self.limit_cpu, self.gcp_service_account_key, self.node_selectors, - self.affinity) + self.affinity, self.annotations) @staticmethod def from_dict(obj): @@ -79,7 +81,8 @@ class KubernetesExecutorConfig: limit_cpu=namespaced.get('limit_cpu', None), gcp_service_account_key=namespaced.get('gcp_service_account_key', None), node_selectors=namespaced.get('node_selectors', None), - affinity=namespaced.get('affinity', None) + affinity=namespaced.get('affinity', None), + annotations=namespaced.get('annotations', {}), ) def as_dict(self): @@ -92,7 +95,8 @@ class KubernetesExecutorConfig: 'limit_cpu': self.limit_cpu, 'gcp_service_account_key': self.gcp_service_account_key, 'node_selectors': self.node_selectors, - 'affinity': self.affinity + 'affinity': self.affinity, + 'annotations': self.annotations, } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 51b51e8..c9f86b0 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -191,9 +191,9 @@ class WorkerConfiguration(LoggingMixin): limit_cpu=kube_executor_config.limit_cpu ) gcp_sa_key = kube_executor_config.gcp_service_account_key - annotations = { - 'iam.cloud.google.com/service-account': gcp_sa_key - } if gcp_sa_key else {} + annotations = kube_executor_config.annotations.copy() + if gcp_sa_key: + annotations['iam.cloud.google.com/service-account'] = gcp_sa_key return Pod( namespace=namespace, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/example_dags/example_kubernetes_annotation.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_kubernetes_annotation.py b/airflow/example_dags/example_kubernetes_annotation.py new file mode 100644 index 0000000..058baf6 --- /dev/null +++ b/airflow/example_dags/example_kubernetes_annotation.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import print_function +import airflow +from airflow.operators.python_operator import PythonOperator +from airflow.models import DAG + +args = { + 'owner': 'airflow', + 'start_date': airflow.utils.dates.days_ago(2) +} + +dag = DAG( + dag_id='example_kubernetes_annotation', default_args=args, + schedule_interval=None +) + + +def print_stuff(): + print("annotated!") + + +# You can use annotations on your kubernetes pods! +start_task = PythonOperator( + task_id="start_task", python_callable=print_stuff, dag=dag, + executor_config={ + "KubernetesExecutor": { + "annotations": {"test": "annotation"} + } + } +) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/tests/contrib/minikube/test_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py index fe38efb..769baae 100644 --- a/tests/contrib/minikube/test_kubernetes_executor.py +++ b/tests/contrib/minikube/test_kubernetes_executor.py @@ -149,8 +149,9 @@ class KubernetesExecutorTest(unittest.TestCase): def test_integration_run_dag(self): host = get_minikube_host() + dag_id = 'example_kubernetes_annotation' - result_json = self.start_dag(dag_id='example_python_operator', host=host) + result_json = self.start_dag(dag_id=dag_id, host=host) self.assertGreater(len(result_json['items']), 0) @@ -160,19 +161,20 @@ class KubernetesExecutorTest(unittest.TestCase): # Wait 100 seconds for the operator to complete self.monitor_task(host=host, execution_date=execution_date, - dag_id='example_python_operator', - task_id='print_the_context', + dag_id=dag_id, + task_id='start_task', expected_final_state='success', timeout=100) self.ensure_dag_expected_state(host=host, execution_date=execution_date, - dag_id='example_python_operator', + dag_id=dag_id, expected_final_state='success', timeout=100) def test_integration_run_dag_with_scheduler_failure(self): host = get_minikube_host() + dag_id = 'example_kubernetes_annotation' - result_json = self.start_dag(dag_id='example_python_operator', host=host) + result_json = self.start_dag(dag_id=dag_id, host=host) self.assertGreater(len(result_json['items']), 0) @@ -186,13 +188,13 @@ class KubernetesExecutorTest(unittest.TestCase): # Wait 100 seconds for the operator to complete self.monitor_task(host=host, execution_date=execution_date, - dag_id='example_python_operator', - task_id='print_the_context', + dag_id=dag_id, + task_id='start_task', expected_final_state='success', timeout=120) self.ensure_dag_expected_state(host=host, execution_date=execution_date, - dag_id='example_python_operator', + dag_id=dag_id, expected_final_state='success', timeout=100) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 513aacf..6cfd10b 100644 --- a/tests/core.py +++ b/tests/core.py @@ -72,7 +72,7 @@ from jinja2 import UndefinedError import six -NUM_EXAMPLE_DAGS = 20 +NUM_EXAMPLE_DAGS = 21 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags')
