This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cd5f064d9fb904a0b4981919ed41bea9afb79897 Author: Daniel Imberman <[email protected]> AuthorDate: Fri Sep 11 10:47:59 2020 -0700 Modify helm chart to use pod_template_file (#10872) * Modify helm chart to use pod_template_file Since we are deprecating most k8sexecutor arguments we should use the pod_template_file when launching airflow using the KubernetesExecutor * fix tests * one more nit * fix dag command * fix pylint (cherry picked from commit 56bd9b7d6b494251fa728ff6a7eb06d6d7eeb2c8) --- .pre-commit-config.yaml | 2 +- airflow/executors/kubernetes_executor.py | 7 +- airflow/kubernetes/pod_generator.py | 47 ++++++- chart/files/pod-template-file.yaml | 105 +++++++++++++++ chart/templates/_helpers.yaml | 4 + chart/templates/configmap.yaml | 8 ++ .../templates/scheduler/scheduler-deployment.yaml | 4 + chart/tests/pod-template-file_test.yaml | 149 +++++++++++++++++++++ docs/production-deployment.rst | 5 - scripts/ci/kubernetes/ci_run_helm_testing.sh | 7 +- tests/kubernetes/test_pod_generator.py | 73 ++++++---- tests/kubernetes/test_worker_configuration.py | 7 +- 12 files changed, 371 insertions(+), 47 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1d278e5..eee592e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,7 +25,7 @@ repos: rev: v1.1.9 hooks: - id: forbid-tabs - exclude: ^docs/Makefile$ + exclude: ^docs/Makefile$|^clients/gen/go.sh - id: insert-license name: Add license for all SQL files files: \.sql$ diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 0ae8f16..73dd91e 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -440,10 +440,11 @@ class AirflowKubernetesScheduler(LoggingMixin): dag_id=pod_generator.make_safe_label_value(dag_id), task_id=pod_generator.make_safe_label_value(task_id), try_number=try_number, - date=self._datetime_to_label_safe_datestring(execution_date), + kube_image=self.kube_config.kube_image, + date=execution_date, command=command, - kube_executor_config=kube_executor_config, - worker_config=self.worker_configuration_pod + pod_override_object=kube_executor_config, + base_worker_pod=self.worker_configuration_pod ) sanitized_pod = self.launcher._client.api_client.sanitize_for_serialization(pod) diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 4fbfec1..5a57230 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -30,6 +30,7 @@ from functools import reduce import kubernetes.client.models as k8s import yaml +from dateutil import parser from kubernetes.client.api_client import ApiClient from airflow.contrib.kubernetes.pod import _extract_volume_mounts @@ -92,6 +93,30 @@ def make_safe_label_value(string): return safe_label +def datetime_to_label_safe_datestring(datetime_obj): + """ + Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but + not "_" let's + replace ":" with "_" + + :param datetime_obj: datetime.datetime object + :return: ISO-like string representing the datetime + """ + return datetime_obj.isoformat().replace(":", "_").replace('+', '_plus_') + + +def label_safe_datestring_to_datetime(string): + """ + Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not + "_", let's + replace ":" with "_" + + :param string: str + :return: datetime.datetime object + """ + return parser.parse(string.replace('_plus_', '+').replace("_", ":")) + + class PodGenerator(object): """ Contains Kubernetes Airflow Worker configuration logic @@ -496,10 +521,11 @@ class PodGenerator(object): task_id, pod_id, try_number, + kube_image, date, command, - kube_executor_config, - worker_config, + pod_override_object, + base_worker_pod, namespace, worker_uuid ): @@ -511,22 +537,29 @@ class PodGenerator(object): """ dynamic_pod = PodGenerator( namespace=namespace, + image=kube_image, labels={ 'airflow-worker': worker_uuid, - 'dag_id': dag_id, - 'task_id': task_id, - 'execution_date': date, + 'dag_id': make_safe_label_value(dag_id), + 'task_id': make_safe_label_value(task_id), + 'execution_date': datetime_to_label_safe_datestring(date), 'try_number': str(try_number), 'airflow_version': airflow_version.replace('+', '-'), 'kubernetes_executor': 'True', }, + annotations={ + 'dag_id': dag_id, + 'task_id': task_id, + 'execution_date': date.isoformat(), + 'try_number': str(try_number), + }, cmds=command, name=pod_id ).gen_pod() # Reconcile the pods starting with the first chronologically, - # Pod from the airflow.cfg -> Pod from executor_config arg -> Pod from the K8s executor - pod_list = [worker_config, kube_executor_config, dynamic_pod] + # Pod from the pod_template_File -> Pod from executor_config arg -> Pod from the K8s executor + pod_list = [base_worker_pod, pod_override_object, dynamic_pod] return reduce(PodGenerator.reconcile_pods, pod_list) diff --git a/chart/files/pod-template-file.yaml b/chart/files/pod-template-file.yaml new file mode 100644 index 0000000..b19edf1 --- /dev/null +++ b/chart/files/pod-template-file.yaml @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +apiVersion: v1 +kind: Pod +metadata: + name: dummy-name +spec: +{{- if .Values.dags.gitSync.enabled }} + initContainers: +{{- include "git_sync_container" . | indent 8 }} +{{- end }} + containers: + - args: [] + command: [] + env: + - name: AIRFLOW__CORE__EXECUTOR + value: LocalExecutor +{{- include "standard_airflow_environment" . | indent 4 }} + envFrom: [] + image: dummy_image + imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} + name: base + ports: [] + volumeMounts: + - mountPath: {{ template "airflow_logs" . }} + name: airflow-logs +{{- if or .Values.dags.persistence.enabled .Values.dags.gitSync.enabled }} + - mountPath: {{ template "airflow_dags_mount_path" . }} + name: airflow-dags + readOnly: false +{{- end }} +{{- if .Values.dags.gitSync.sshKeySecret }} + - mountPath: /etc/git-secret/known_hosts + name: {{ .Values.dags.gitSync.knownHosts }} + subPath: known_hosts +{{- end }} +{{- if .Values.dags.gitSync.sshKeySecret }} + - mountPath: /etc/git-secret/ssh + name: git-sync-ssh-key + subPath: ssh +{{- end }} +{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }} + - mountPath: {{ include "airflow_dags_mount_path" . }} + name: airflow-dags + readOnly: true +{{- if .Values.dags.persistence.enabled }} + subPath: {{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }} +{{- end }} +{{- end }} + hostNetwork: false + {{- if or .Values.registry.secretName .Values.registry.connection }} + imagePullSecrets: + - name: {{ template "registry_secret" . }} + {{- end }} + restartPolicy: Never + securityContext: + runAsUser: {{ .Values.uid }} + nodeSelector: + {{ toYaml .Values.nodeSelector | indent 8 }} + affinity: + {{ toYaml .Values.affinity | indent 8 }} + tolerations: + {{ toYaml .Values.tolerations | indent 8 }} + serviceAccountName: '{{ .Release.Name }}-worker-serviceaccount' + volumes: + {{- if .Values.dags.persistence.enabled }} + - name: dags + persistentVolumeClaim: + claimName: {{ template "airflow_dags_volume_claim" . }} + {{- else if .Values.dags.gitSync.enabled }} + - name: dags + emptyDir: {} + {{- end }} + {{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }} +{{- include "git_sync_ssh_key_volume" . | indent 2 }} + {{- end }} + - emptyDir: {} + name: airflow-logs +{{- if .Values.dags.gitSync.knownHosts }} + - configMap: + defaultMode: 288 + name: {{ include "airflow_config" . }} + name: git-sync-known-hosts +{{- end }} + - configMap: + name: {{ include "airflow_config" . }} + name: airflow-config + - configMap: + name: {{ include "airflow_config" . }} + name: airflow-local-settings diff --git a/chart/templates/_helpers.yaml b/chart/templates/_helpers.yaml index 898924f..49c3b3f 100644 --- a/chart/templates/_helpers.yaml +++ b/chart/templates/_helpers.yaml @@ -213,6 +213,10 @@ {{ default (printf "%s-airflow-result-backend" .Release.Name) .Values.data.resultBackendSecretName }} {{- end }} +{{ define "airflow_pod_template_file" -}} +{{ (printf "%s/pod_templates" .Values.airflowHome) }} +{{- end }} + {{ define "pgbouncer_config_secret" -}} {{ .Release.Name }}-pgbouncer-config {{- end }} diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index f0e09a0..cc9a388 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -55,3 +55,11 @@ data: known_hosts: | {{ .Values.dags.gitSync.knownHosts | nindent 4 }} {{- end }} +{{- if eq .Values.executor "KubernetesExecutor" }} + pod_template_file.yaml: |- +{{- if .Values.podTemplate }} + {{ .Values.podTemplate | nindent 4 }} +{{- else }} +{{ tpl (.Files.Get "files/pod-template-file.yaml") . | nindent 4 }} +{{- end }} +{{- end }} diff --git a/chart/templates/scheduler/scheduler-deployment.yaml b/chart/templates/scheduler/scheduler-deployment.yaml index 9331556..f2b4a99 100644 --- a/chart/templates/scheduler/scheduler-deployment.yaml +++ b/chart/templates/scheduler/scheduler-deployment.yaml @@ -133,6 +133,10 @@ spec: resources: {{ toYaml .Values.scheduler.resources | indent 12 }} volumeMounts: + - name: config + mountPath: {{ include "airflow_pod_template_file" . }}/pod_template_file.yaml + subPath: pod_template_file.yaml + readOnly: true - name: logs mountPath: {{ template "airflow_logs" . }} - name: config diff --git a/chart/tests/pod-template-file_test.yaml b/chart/tests/pod-template-file_test.yaml new file mode 100644 index 0000000..64e99f8 --- /dev/null +++ b/chart/tests/pod-template-file_test.yaml @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +templates: + - pod-template-file.yaml +tests: + - it: should work + asserts: + - isKind: + of: Pod + - equal: + path: spec.containers[0].image + value: dummy_image + - equal: + path: spec.containers[0].name + value: base + - it: should add an initContainer if gitSync is true + set: + dags: + gitSync: + enabled: true + containerName: git-sync-test + containerTag: test-tag + containerRepository: test-registry/test-repo + wait: 66 + maxFailures: 70 + subPath: "path1/path2" + dest: "test-dest" + root: "/git-root" + rev: HEAD + depth: 1 + repo: https://github.com/apache/airflow.git + branch: test-branch + sshKeySecret: ~ + credentialsSecret: ~ + knownHosts: ~ + asserts: + - isKind: + of: Pod + - equal: + path: spec.initContainers[0] + value: + name: git-sync-test + image: test-registry/test-repo:test-tag + env: + - name: GIT_SYNC_REV + value: HEAD + - name: GIT_SYNC_BRANCH + value: test-branch + - name: GIT_SYNC_REPO + value: https://github.com/apache/airflow.git + - name: GIT_SYNC_DEPTH + value: "1" + - name: GIT_SYNC_ROOT + value: /git-root + - name: GIT_SYNC_DEST + value: test-dest + - name: GIT_SYNC_ADD_USER + value: "true" + - name: GIT_SYNC_WAIT + value: "66" + - name: GIT_SYNC_MAX_SYNC_FAILURES + value: "70" + volumeMounts: + - mountPath: /git-root + name: dags + - it: validate if ssh params are added + set: + dags: + gitSync: + enabled: true + containerName: git-sync-test + sshKeySecret: ssh-secret + knownHosts: ~ + branch: test-branch + asserts: + - contains: + path: spec.initContainers[0].env + content: + name: GIT_SSH_KEY_FILE + value: "/etc/git-secret/ssh" + - contains: + path: spec.initContainers[0].env + content: + name: GIT_SYNC_SSH + value: "true" + - contains: + path: spec.initContainers[0].env + content: + name: GIT_KNOWN_HOSTS + value: "false" + - contains: + path: spec.volumes + content: + name: git-sync-ssh-key + secret: + secretName: ssh-secret + defaultMode: 288 + - it: should set username and pass env variables + set: + dags: + gitSync: + enabled: true + credentialsSecret: user-pass-secret + sshKeySecret: ~ + asserts: + - contains: + path: spec.initContainers[0].env + content: + name: GIT_SYNC_USERNAME + valueFrom: + secretKeyRef: + name: user-pass-secret + key: GIT_SYNC_USERNAME + - contains: + path: spec.initContainers[0].env + content: + name: GIT_SYNC_PASSWORD + valueFrom: + secretKeyRef: + name: user-pass-secret + key: GIT_SYNC_PASSWORD + - it: should set the volume claim correctly when using an existing claim + set: + dags: + persistence: + enabled: true + existingClaim: test-claim + asserts: + - contains: + path: spec.volumes + content: + name: dags + persistentVolumeClaim: + claimName: test-claim diff --git a/docs/production-deployment.rst b/docs/production-deployment.rst index 7c4bfab..de974a6 100644 --- a/docs/production-deployment.rst +++ b/docs/production-deployment.rst @@ -630,8 +630,3 @@ Keytab secret and both containers in the same Pod share the volume, where tempor the side-care container and read by the worker container. This concept is implemented in the development version of the Helm Chart that is part of Airflow source code. - - -.. spelling:: - - pypirc diff --git a/scripts/ci/kubernetes/ci_run_helm_testing.sh b/scripts/ci/kubernetes/ci_run_helm_testing.sh index e5308db..224cc9e 100755 --- a/scripts/ci/kubernetes/ci_run_helm_testing.sh +++ b/scripts/ci/kubernetes/ci_run_helm_testing.sh @@ -20,9 +20,10 @@ echo "Running helm tests" chart_directory="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/../../../chart/" -echo "Chart directory is ${chart_directory}" +cat chart/files/pod-template-file.yaml > chart/templates/pod-template-file.yaml -docker run -w /airflow-chart -v "${chart_directory}":/airflow-chart \ +docker run -w /airflow-chart -v "$chart_directory":/airflow-chart \ --entrypoint /bin/sh \ aneeshkj/helm-unittest \ - -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ." + -c "helm repo add stable https://kubernetes-charts.storage.googleapis.com; helm dependency update ; helm unittest ." \ + && rm chart/templates/pod-template-file.yaml diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index 0c9d722..fed7c97 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -19,12 +19,14 @@ import unittest import sys from tests.compat import mock import uuid -import kubernetes.client.models as k8s -from kubernetes.client import ApiClient + +from dateutil import parser +from kubernetes.client import ApiClient, models as k8s from airflow.kubernetes.k8s_model import append_to_pod from airflow.kubernetes.pod import Resources -from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects +from airflow.kubernetes.pod_generator import PodDefaults, PodGenerator, extend_object_field, merge_objects, \ + datetime_to_label_safe_datestring from airflow.kubernetes.secret import Secret @@ -63,16 +65,25 @@ class TestPodGenerator(unittest.TestCase): Secret('env', 'TARGET', 'secret_b', 'source_b'), ] + self.execution_date = parser.parse('2020-08-24 00:00:00.000000') + self.execution_date_label = datetime_to_label_safe_datestring(self.execution_date) + self.dag_id = 'dag_id' + self.task_id = 'task_id' + self.try_number = 3 self.labels = { 'airflow-worker': 'uuid', 'dag_id': 'dag_id', - 'execution_date': 'date', + 'execution_date': mock.ANY, 'task_id': 'task_id', 'try_number': '3', 'airflow_version': mock.ANY, 'kubernetes_executor': 'True' } self.metadata = { + 'annotations': {'dag_id': 'dag_id', + 'execution_date': '2020-08-24T00:00:00', + 'task_id': 'task_id', + 'try_number': '3'}, 'labels': self.labels, 'name': 'pod_id-' + self.static_uuid.hex, 'namespace': 'namespace' @@ -646,8 +657,9 @@ class TestPodGenerator(unittest.TestCase): 'dag_id', 'task_id', 'pod_id', - 3, - 'date', + self.try_number, + "kube_image", + self.execution_date, ['command'], executor_config, worker_config, @@ -667,6 +679,7 @@ class TestPodGenerator(unittest.TestCase): 'env': [], 'envFrom': [], 'name': 'base', + 'image': 'kube_image', 'ports': [], 'resources': { 'limits': { @@ -706,8 +719,9 @@ class TestPodGenerator(unittest.TestCase): 'dag_id', 'task_id', 'pod_id', - 3, - 'date', + self.try_number, + "kube_image", + self.execution_date, ['command'], executor_config, worker_config, @@ -727,6 +741,7 @@ class TestPodGenerator(unittest.TestCase): 'env': [], 'envFrom': [], 'name': 'base', + 'image': 'kube_image', 'ports': [], 'resources': { 'limits': { @@ -789,8 +804,9 @@ class TestPodGenerator(unittest.TestCase): 'dag_id', 'task_id', 'pod_id', - 3, - 'date', + self.try_number, + "kube_image", + self.execution_date, ['command'], executor_config, worker_config, @@ -799,7 +815,7 @@ class TestPodGenerator(unittest.TestCase): ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.metadata.update({'annotations': {'should': 'stay'}}) + self.metadata['annotations']['should'] = 'stay' self.assertEqual({ 'apiVersion': 'v1', @@ -811,6 +827,7 @@ class TestPodGenerator(unittest.TestCase): 'command': ['command'], 'env': [], 'envFrom': [], + 'image': 'kube_image', 'name': 'base', 'ports': [], 'resources': { @@ -872,20 +889,21 @@ class TestPodGenerator(unittest.TestCase): ) result = PodGenerator.construct_pod( - 'dag_id', - 'task_id', - 'pod_id', - 3, - 'date', - ['command'], - executor_config, - worker_config, - 'namespace', - 'uuid', + dag_id='dag_id', + task_id='task_id', + pod_id='pod_id', + try_number=3, + kube_image='kube_image', + date=self.execution_date, + command=['command'], + pod_override_object=executor_config, + base_worker_pod=worker_config, + namespace='namespace', + worker_uuid='uuid', ) sanitized_result = self.k8s_client.sanitize_for_serialization(result) - self.metadata.update({'annotations': {'should': 'stay'}}) + self.metadata['annotations']['should'] = 'stay' self.assertEqual({ 'apiVersion': 'v1', @@ -898,6 +916,7 @@ class TestPodGenerator(unittest.TestCase): 'env': [], 'envFrom': [], 'name': 'base', + 'image': 'kube_image', 'ports': [], 'resources': { 'limits': { @@ -1081,12 +1100,14 @@ spec: worker_uuid="test", pod_id="test", dag_id="test", + kube_image="foo", task_id="test", try_number=1, - date="23-07-2020", + date=parser.parse("23-07-2020"), command="test", - kube_executor_config=None, - worker_config=k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"}, - annotations={"my.annotation": "foo"}))) + pod_override_object=None, + base_worker_pod=k8s.V1Pod( + metadata=k8s.V1ObjectMeta(labels={"airflow-test": "airflow-task-pod"}, + annotations={"my.annotation": "foo"}))) self.assertIn("airflow-test", pod.metadata.labels) self.assertIn("my.annotation", pod.metadata.annotations) diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 40271dc..0ac7940 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -18,6 +18,7 @@ import unittest import six +from dateutil import parser from parameterized import parameterized from tests.compat import mock @@ -373,12 +374,14 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): self.kube_config.base_log_folder = '/logs' worker_config = WorkerConfiguration(self.kube_config) + execution_date = parser.parse('2019-11-21 11:08:22.920875') pod = PodGenerator.construct_pod( "test_dag_id", "test_task_id", "test_pod_id", 1, - "2019-11-21 11:08:22.920875", + 'kube_image', + execution_date, ["bash -c 'ls /'"], None, worker_config.as_pod(), @@ -389,7 +392,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): 'airflow-worker': 'sample-uuid', 'airflow_version': airflow_version.replace('+', '-'), 'dag_id': 'test_dag_id', - 'execution_date': '2019-11-21 11:08:22.920875', + 'execution_date': '2019-11-21T11_08_22.920875', 'kubernetes_executor': 'True', 'my_label': 'label_id', 'task_id': 'test_task_id',
