ashb closed pull request #3855: [AIRFLOW-3022] Add volume mount to 
KubernetesExecutorConfig
URL: https://github.com/apache/incubator-airflow/pull/3855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_kubernetes_annotation.py 
b/airflow/contrib/example_dags/example_kubernetes_executor_config.py
similarity index 61%
rename from airflow/contrib/example_dags/example_kubernetes_annotation.py
rename to airflow/contrib/example_dags/example_kubernetes_executor_config.py
index 058baf6990..3aa70f862b 100644
--- a/airflow/contrib/example_dags/example_kubernetes_annotation.py
+++ b/airflow/contrib/example_dags/example_kubernetes_executor_config.py
@@ -20,6 +20,7 @@
 import airflow
 from airflow.operators.python_operator import PythonOperator
 from airflow.models import DAG
+import os
 
 args = {
     'owner': 'airflow',
@@ -27,7 +28,7 @@
 }
 
 dag = DAG(
-    dag_id='example_kubernetes_annotation', default_args=args,
+    dag_id='example_kubernetes_executor_config', default_args=args,
     schedule_interval=None
 )
 
@@ -36,6 +37,14 @@ def print_stuff():
     print("annotated!")
 
 
+def test_volume_mount():
+    with open('/foo/volume_mount_test.txt', 'w') as foo:
+        foo.write('Hello')
+
+    rc = os.system("cat /foo/volume_mount_test.txt")
+    assert rc == 0
+
+
 # You can use annotations on your kubernetes pods!
 start_task = PythonOperator(
     task_id="start_task", python_callable=print_stuff, dag=dag,
@@ -45,3 +54,26 @@ def print_stuff():
         }
     }
 )
+
+# You can mount volume or secret to the worker pod
+second_task = PythonOperator(
+    task_id="four_task", python_callable=test_volume_mount, dag=dag,
+    executor_config={
+        "KubernetesExecutor": {
+            "volumes": [
+                {
+                    "name": "test-volume",
+                    "hostPath": {"path": "/tmp/"},
+                },
+            ],
+            "volume_mounts": [
+                {
+                    "mountPath": "/foo/",
+                    "name": "test-volume",
+                },
+            ]
+        }
+    }
+)
+
+start_task.set_downstream(second_task)
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index cf58169345..e23ff96402 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -40,7 +40,7 @@ class KubernetesExecutorConfig:
     def __init__(self, image=None, image_pull_policy=None, request_memory=None,
                  request_cpu=None, limit_memory=None, limit_cpu=None,
                  gcp_service_account_key=None, node_selectors=None, 
affinity=None,
-                 annotations=None):
+                 annotations=None, volumes=None, volume_mounts=None):
         self.image = image
         self.image_pull_policy = image_pull_policy
         self.request_memory = request_memory
@@ -51,15 +51,18 @@ def __init__(self, image=None, image_pull_policy=None, 
request_memory=None,
         self.node_selectors = node_selectors
         self.affinity = affinity
         self.annotations = annotations
+        self.volumes = volumes
+        self.volume_mounts = volume_mounts
 
     def __repr__(self):
         return "{}(image={}, image_pull_policy={}, request_memory={}, 
request_cpu={}, " \
                "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
-               "node_selectors={}, affinity={}, annotations={})" \
+               "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
+               "volume_mounts={})" \
             .format(KubernetesExecutorConfig.__name__, self.image, 
self.image_pull_policy,
                     self.request_memory, self.request_cpu, self.limit_memory,
                     self.limit_cpu, self.gcp_service_account_key, 
self.node_selectors,
-                    self.affinity, self.annotations)
+                    self.affinity, self.annotations, self.volumes, 
self.volume_mounts)
 
     @staticmethod
     def from_dict(obj):
@@ -83,6 +86,8 @@ def from_dict(obj):
             node_selectors=namespaced.get('node_selectors', None),
             affinity=namespaced.get('affinity', None),
             annotations=namespaced.get('annotations', {}),
+            volumes=namespaced.get('volumes', []),
+            volume_mounts=namespaced.get('volume_mounts', []),
         )
 
     def as_dict(self):
@@ -97,6 +102,8 @@ def as_dict(self):
             'node_selectors': self.node_selectors,
             'affinity': self.affinity,
             'annotations': self.annotations,
+            'volumes': self.volumes,
+            'volume_mounts': self.volume_mounts,
         }
 
 
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index 74658e384a..01298c5e60 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -186,6 +186,8 @@ def _construct_volume(name, claim):
     def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, 
execution_date,
                  airflow_command, kube_executor_config):
         volumes, volume_mounts = self.init_volumes_and_mounts()
+        volumes += kube_executor_config.volumes
+        volume_mounts += kube_executor_config.volume_mounts
         worker_init_container_spec = self._get_init_containers(
             copy.deepcopy(volume_mounts))
         resources = Resources(
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py 
b/tests/contrib/minikube/test_kubernetes_executor.py
index 23bdfaec8d..7d3947799d 100644
--- a/tests/contrib/minikube/test_kubernetes_executor.py
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -158,7 +158,7 @@ def start_dag(self, dag_id, host):
 
     def test_integration_run_dag(self):
         host = get_minikube_host()
-        dag_id = 'example_kubernetes_annotation'
+        dag_id = 'example_kubernetes_executor_config'
 
         result_json = self.start_dag(dag_id=dag_id, host=host)
 
@@ -181,7 +181,7 @@ def test_integration_run_dag(self):
 
     def test_integration_run_dag_with_scheduler_failure(self):
         host = get_minikube_host()
-        dag_id = 'example_kubernetes_annotation'
+        dag_id = 'example_kubernetes_executor_config'
 
         result_json = self.start_dag(dag_id=dag_id, host=host)
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to