This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 33992dc693f51d0dd7571d9b377bfd944181ab90 Author: Brandon T. Willard <[email protected]> AuthorDate: Tue Apr 7 15:35:09 2020 -0500 [AIRFLOW-6778] Add a configurable DAGs volume mount path for Kubernetes (#8147) (cherry picked from commit 75896c30cf37002585e3b17efa002da279090f76) --- airflow/config_templates/config.yml | 7 +++++++ airflow/config_templates/default_airflow.cfg | 3 +++ airflow/executors/kubernetes_executor.py | 2 ++ airflow/kubernetes/worker_configuration.py | 5 +++++ tests/kubernetes/test_worker_configuration.py | 6 ++++++ 5 files changed, 23 insertions(+) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a7fa7a6..9b63200 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1827,6 +1827,13 @@ type: string example: ~ default: "" + - name: dags_volume_mount_point + description: | + For either git sync or volume mounted DAGs, the worker will mount the volume in this path + version_added: ~ + type: string + example: ~ + default: "" - name: dags_volume_claim description: | For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 27fe92a..ca9de12 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -853,6 +853,9 @@ dags_in_image = False # For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs dags_volume_subpath = +# For either git sync or volume mounted DAGs, the worker will mount the volume in this path +dags_volume_mount_point = + # For DAGs mounted via a volume claim (mutually exclusive with git-sync and host path) dags_volume_claim = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 6ec2660..98e3154 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -125,6 +125,8 @@ class KubeConfig: # DAGs directly self.dags_volume_claim = conf.get(self.kubernetes_section, 'dags_volume_claim') + self.dags_volume_mount_point = conf.get(self.kubernetes_section, 'dags_volume_mount_point') + # This prop may optionally be set for PV Claims and is used to write logs self.logs_volume_claim = conf.get(self.kubernetes_section, 'logs_volume_claim') diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py index 820763b..9c35910 100644 --- a/airflow/kubernetes/worker_configuration.py +++ b/airflow/kubernetes/worker_configuration.py @@ -412,6 +412,11 @@ class WorkerConfiguration(LoggingMixin): return list(volumes.values()) def generate_dag_volume_mount_path(self): + """Generate path for DAG volume""" + + if self.kube_config.dags_volume_mount_point: + return self.kube_config.dags_volume_mount_point + if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host: return self.worker_airflow_dags diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py index 73b3f20..0730595 100644 --- a/tests/kubernetes/test_worker_configuration.py +++ b/tests/kubernetes/test_worker_configuration.py @@ -88,6 +88,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): self.kube_config.airflow_dags = 'dags' self.kube_config.airflow_logs = 'logs' self.kube_config.dags_volume_subpath = None + self.kube_config.dags_volume_mount_point = None self.kube_config.logs_volume_subpath = None self.kube_config.dags_in_image = False self.kube_config.dags_folder = None @@ -145,6 +146,11 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() self.assertEqual(dag_volume_mount_path, self.kube_config.dags_folder) + self.kube_config.dags_volume_mount_point = '/root/airflow/package' + dag_volume_mount_path = worker_config.generate_dag_volume_mount_path() + self.assertEqual(dag_volume_mount_path, '/root/airflow/package') + self.kube_config.dags_volume_mount_point = '' + self.kube_config.dags_volume_claim = '' self.kube_config.dags_volume_host = '/host/airflow/dags' dag_volume_mount_path = worker_config.generate_dag_volume_mount_path()
