[AIRFLOW-1999] Add per-task GCP service account support
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdb43cb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdb43cb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdb43cb8 Branch: refs/heads/master Commit: cdb43cb87c9bd6db1cb0317cef861faf9c7b0e86 Parents: b9a87a0 Author: fenglu-g <[email protected]> Authored: Wed Mar 21 01:24:00 2018 -0400 Committer: Fokko Driesprong <[email protected]> Committed: Sun Apr 22 10:23:06 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/executors/kubernetes_executor.py | 14 +++++++++----- .../kubernetes_request_factory.py | 6 ++++++ .../kubernetes_request_factory/pod_request_factory.py | 1 + airflow/contrib/kubernetes/pod.py | 4 +++- airflow/contrib/kubernetes/worker_configuration.py | 7 ++++++- 5 files changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/executors/kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 49993a8..b497387 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -37,23 +37,25 @@ from airflow.utils.log.logging_mixin import LoggingMixin class KubernetesExecutorConfig: def __init__(self, image=None, request_memory=None, request_cpu=None, - limit_memory=None, limit_cpu=None): + limit_memory=None, limit_cpu=None, gcp_service_account_key=None): self.image = image self.request_memory = request_memory self.request_cpu = request_cpu self.limit_memory = limit_memory self.limit_cpu = limit_cpu + self.gcp_service_account_key = gcp_service_account_key def __repr__(self): return "{}(image={}, request_memory={} ,request_cpu={}, limit_memory={}, " \ - "limit_cpu={})"\ + "limit_cpu={}, gcp_service_account_key={})"\ .format( KubernetesExecutorConfig.__name__, self.image, self.request_memory, self.request_cpu, self.limit_memory, - self.limit_cpu + self.limit_cpu, + self.gcp_service_account_key ) @staticmethod @@ -72,7 +74,8 @@ class KubernetesExecutorConfig: request_memory=namespaced.get("request_memory", None), request_cpu=namespaced.get("request_cpu", None), limit_memory=namespaced.get("limit_memory", None), - limit_cpu=namespaced.get("limit_cpu", None) + limit_cpu=namespaced.get("limit_cpu", None), + gcp_service_account_key=namespaced.get("gcp_service_account_key", None) ) def as_dict(self): @@ -81,7 +84,8 @@ class KubernetesExecutorConfig: "request_memory": self.request_memory, "request_cpu": self.request_cpu, "limit_memory": self.limit_memory, - "limit_cpu": self.limit_cpu + "limit_cpu": self.limit_cpu, + "gcp_service_account_key": self.gcp_service_account_key } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index cbf3fce..6e8632f 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -64,6 +64,12 @@ class KubernetesRequestFactory: req['metadata']['labels'][k] = v @staticmethod + def extract_annotations(pod, req): + req['metadata']['annotations'] = req['metadata'].get('annotations', {}) + for k, v in six.iteritems(pod.annotations): + req['metadata']['annotations'][k] = v + + @staticmethod def extract_cmds(pod, req): req['spec']['containers'][0]['command'] = pod.cmds http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 44b05dd..106a6be 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -57,4 +57,5 @@ spec: self.extract_service_account_name(pod, req) self.extract_init_containers(pod, req) self.extract_image_pull_secrets(pod, req) + self.extract_annotations(pod, req) return req http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/pod.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 01d6760..8dbc947 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -72,7 +72,8 @@ class Pod: image_pull_secrets=None, init_containers=None, service_account_name=None, - resources=None + resources=None, + annotations=None ): self.image = image self.envs = envs or {} @@ -91,3 +92,4 @@ class Pod: self.init_containers = init_containers self.service_account_name = service_account_name self.resources = resources or Resources() + self.annotations = annotations or {} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdb43cb8/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 5cb92ef..988f4a5 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -150,6 +150,10 @@ class WorkerConfiguration: limit_memory=kube_executor_config.limit_memory, limit_cpu=kube_executor_config.limit_cpu ) + gcp_sa_key = kube_executor_config.gcp_service_account_key + annotations = { + "iam.cloud.google.com/service-account": gcp_sa_key + } if gcp_sa_key else {} return Pod( namespace=namespace, @@ -170,5 +174,6 @@ class WorkerConfiguration: init_containers=worker_init_container_spec, volumes=volumes, volume_mounts=volume_mounts, - resources=resources + resources=resources, + annotations=annotations )
