Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4be1ffeec -> 4cfebd8ae


[AIRFLOW-2712] Pass annotations to KubernetesExecutorConfig

Closes #3576 from tfpk/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cfebd8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cfebd8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cfebd8a

Branch: refs/heads/master
Commit: 4cfebd8aed6acd64b34daa983413f60552e671ec
Parents: 4be1ffe
Author: Tom Kunc <[email protected]>
Authored: Tue Jul 24 01:12:09 2018 +0100
Committer: Kaxil Naik <[email protected]>
Committed: Tue Jul 24 01:12:09 2018 +0100

----------------------------------------------------------------------
 .../contrib/executors/kubernetes_executor.py    | 14 +++---
 .../contrib/kubernetes/worker_configuration.py  |  6 +--
 .../example_kubernetes_annotation.py            | 47 ++++++++++++++++++++
 .../minikube/test_kubernetes_executor.py        | 18 ++++----
 tests/core.py                                   |  2 +-
 5 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/contrib/executors/kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 4ea52c4..788d925 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -39,7 +39,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 class KubernetesExecutorConfig:
     def __init__(self, image=None, image_pull_policy=None, request_memory=None,
                  request_cpu=None, limit_memory=None, limit_cpu=None,
-                 gcp_service_account_key=None, node_selectors=None, 
affinity=None):
+                 gcp_service_account_key=None, node_selectors=None, 
affinity=None,
+                 annotations=None):
         self.image = image
         self.image_pull_policy = image_pull_policy
         self.request_memory = request_memory
@@ -49,15 +50,16 @@ class KubernetesExecutorConfig:
         self.gcp_service_account_key = gcp_service_account_key
         self.node_selectors = node_selectors
         self.affinity = affinity
+        self.annotations = annotations
 
     def __repr__(self):
         return "{}(image={}, image_pull_policy={}, request_memory={}, 
request_cpu={}, " \
                "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
-               "node_selectors={}, affinity={})" \
+               "node_selectors={}, affinity={}, annotations={})" \
             .format(KubernetesExecutorConfig.__name__, self.image, 
self.image_pull_policy,
                     self.request_memory, self.request_cpu, self.limit_memory,
                     self.limit_cpu, self.gcp_service_account_key, 
self.node_selectors,
-                    self.affinity)
+                    self.affinity, self.annotations)
 
     @staticmethod
     def from_dict(obj):
@@ -79,7 +81,8 @@ class KubernetesExecutorConfig:
             limit_cpu=namespaced.get('limit_cpu', None),
             gcp_service_account_key=namespaced.get('gcp_service_account_key', 
None),
             node_selectors=namespaced.get('node_selectors', None),
-            affinity=namespaced.get('affinity', None)
+            affinity=namespaced.get('affinity', None),
+            annotations=namespaced.get('annotations', {}),
         )
 
     def as_dict(self):
@@ -92,7 +95,8 @@ class KubernetesExecutorConfig:
             'limit_cpu': self.limit_cpu,
             'gcp_service_account_key': self.gcp_service_account_key,
             'node_selectors': self.node_selectors,
-            'affinity': self.affinity
+            'affinity': self.affinity,
+            'annotations': self.annotations,
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index 51b51e8..c9f86b0 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -191,9 +191,9 @@ class WorkerConfiguration(LoggingMixin):
             limit_cpu=kube_executor_config.limit_cpu
         )
         gcp_sa_key = kube_executor_config.gcp_service_account_key
-        annotations = {
-            'iam.cloud.google.com/service-account': gcp_sa_key
-        } if gcp_sa_key else {}
+        annotations = kube_executor_config.annotations.copy()
+        if gcp_sa_key:
+            annotations['iam.cloud.google.com/service-account'] = gcp_sa_key
 
         return Pod(
             namespace=namespace,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/airflow/example_dags/example_kubernetes_annotation.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_kubernetes_annotation.py 
b/airflow/example_dags/example_kubernetes_annotation.py
new file mode 100644
index 0000000..058baf6
--- /dev/null
+++ b/airflow/example_dags/example_kubernetes_annotation.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import print_function
+import airflow
+from airflow.operators.python_operator import PythonOperator
+from airflow.models import DAG
+
+args = {
+    'owner': 'airflow',
+    'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+    dag_id='example_kubernetes_annotation', default_args=args,
+    schedule_interval=None
+)
+
+
+def print_stuff():
+    print("annotated!")
+
+
+# You can use annotations on your kubernetes pods!
+start_task = PythonOperator(
+    task_id="start_task", python_callable=print_stuff, dag=dag,
+    executor_config={
+        "KubernetesExecutor": {
+            "annotations": {"test": "annotation"}
+        }
+    }
+)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/tests/contrib/minikube/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py 
b/tests/contrib/minikube/test_kubernetes_executor.py
index fe38efb..769baae 100644
--- a/tests/contrib/minikube/test_kubernetes_executor.py
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -149,8 +149,9 @@ class KubernetesExecutorTest(unittest.TestCase):
 
     def test_integration_run_dag(self):
         host = get_minikube_host()
+        dag_id = 'example_kubernetes_annotation'
 
-        result_json = self.start_dag(dag_id='example_python_operator', 
host=host)
+        result_json = self.start_dag(dag_id=dag_id, host=host)
 
         self.assertGreater(len(result_json['items']), 0)
 
@@ -160,19 +161,20 @@ class KubernetesExecutorTest(unittest.TestCase):
         # Wait 100 seconds for the operator to complete
         self.monitor_task(host=host,
                           execution_date=execution_date,
-                          dag_id='example_python_operator',
-                          task_id='print_the_context',
+                          dag_id=dag_id,
+                          task_id='start_task',
                           expected_final_state='success', timeout=100)
 
         self.ensure_dag_expected_state(host=host,
                                        execution_date=execution_date,
-                                       dag_id='example_python_operator',
+                                       dag_id=dag_id,
                                        expected_final_state='success', 
timeout=100)
 
     def test_integration_run_dag_with_scheduler_failure(self):
         host = get_minikube_host()
+        dag_id = 'example_kubernetes_annotation'
 
-        result_json = self.start_dag(dag_id='example_python_operator', 
host=host)
+        result_json = self.start_dag(dag_id=dag_id, host=host)
 
         self.assertGreater(len(result_json['items']), 0)
 
@@ -186,13 +188,13 @@ class KubernetesExecutorTest(unittest.TestCase):
         # Wait 100 seconds for the operator to complete
         self.monitor_task(host=host,
                           execution_date=execution_date,
-                          dag_id='example_python_operator',
-                          task_id='print_the_context',
+                          dag_id=dag_id,
+                          task_id='start_task',
                           expected_final_state='success', timeout=120)
 
         self.ensure_dag_expected_state(host=host,
                                        execution_date=execution_date,
-                                       dag_id='example_python_operator',
+                                       dag_id=dag_id,
                                        expected_final_state='success', 
timeout=100)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cfebd8a/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 513aacf..6cfd10b 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -72,7 +72,7 @@ from jinja2 import UndefinedError
 
 import six
 
-NUM_EXAMPLE_DAGS = 20
+NUM_EXAMPLE_DAGS = 21
 DEV_NULL = '/dev/null'
 TEST_DAG_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')

Reply via email to