[
https://issues.apache.org/jira/browse/AIRFLOW-3022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675403#comment-16675403
]
ASF GitHub Bot commented on AIRFLOW-3022:
-----------------------------------------
ashb closed pull request #3855: [AIRFLOW-3022] Add volume mount to
KubernetesExecutorConfig
URL: https://github.com/apache/incubator-airflow/pull/3855
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/example_dags/example_kubernetes_annotation.py
b/airflow/contrib/example_dags/example_kubernetes_executor_config.py
similarity index 61%
rename from airflow/contrib/example_dags/example_kubernetes_annotation.py
rename to airflow/contrib/example_dags/example_kubernetes_executor_config.py
index 058baf6990..3aa70f862b 100644
--- a/airflow/contrib/example_dags/example_kubernetes_annotation.py
+++ b/airflow/contrib/example_dags/example_kubernetes_executor_config.py
@@ -20,6 +20,7 @@
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
+import os
args = {
'owner': 'airflow',
@@ -27,7 +28,7 @@
}
dag = DAG(
- dag_id='example_kubernetes_annotation', default_args=args,
+ dag_id='example_kubernetes_executor_config', default_args=args,
schedule_interval=None
)
@@ -36,6 +37,14 @@ def print_stuff():
print("annotated!")
+def test_volume_mount():
+ with open('/foo/volume_mount_test.txt', 'w') as foo:
+ foo.write('Hello')
+
+ rc = os.system("cat /foo/volume_mount_test.txt")
+ assert rc == 0
+
+
# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag,
@@ -45,3 +54,26 @@ def print_stuff():
}
}
)
+
+# You can mount volume or secret to the worker pod
+second_task = PythonOperator(
+ task_id="four_task", python_callable=test_volume_mount, dag=dag,
+ executor_config={
+ "KubernetesExecutor": {
+ "volumes": [
+ {
+ "name": "test-volume",
+ "hostPath": {"path": "/tmp/"},
+ },
+ ],
+ "volume_mounts": [
+ {
+ "mountPath": "/foo/",
+ "name": "test-volume",
+ },
+ ]
+ }
+ }
+)
+
+start_task.set_downstream(second_task)
diff --git a/airflow/contrib/executors/kubernetes_executor.py
b/airflow/contrib/executors/kubernetes_executor.py
index cf58169345..e23ff96402 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -40,7 +40,7 @@ class KubernetesExecutorConfig:
def __init__(self, image=None, image_pull_policy=None, request_memory=None,
request_cpu=None, limit_memory=None, limit_cpu=None,
gcp_service_account_key=None, node_selectors=None,
affinity=None,
- annotations=None):
+ annotations=None, volumes=None, volume_mounts=None):
self.image = image
self.image_pull_policy = image_pull_policy
self.request_memory = request_memory
@@ -51,15 +51,18 @@ def __init__(self, image=None, image_pull_policy=None,
request_memory=None,
self.node_selectors = node_selectors
self.affinity = affinity
self.annotations = annotations
+ self.volumes = volumes
+ self.volume_mounts = volume_mounts
def __repr__(self):
return "{}(image={}, image_pull_policy={}, request_memory={},
request_cpu={}, " \
"limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \
- "node_selectors={}, affinity={}, annotations={})" \
+ "node_selectors={}, affinity={}, annotations={}, volumes={}, " \
+ "volume_mounts={})" \
.format(KubernetesExecutorConfig.__name__, self.image,
self.image_pull_policy,
self.request_memory, self.request_cpu, self.limit_memory,
self.limit_cpu, self.gcp_service_account_key,
self.node_selectors,
- self.affinity, self.annotations)
+ self.affinity, self.annotations, self.volumes,
self.volume_mounts)
@staticmethod
def from_dict(obj):
@@ -83,6 +86,8 @@ def from_dict(obj):
node_selectors=namespaced.get('node_selectors', None),
affinity=namespaced.get('affinity', None),
annotations=namespaced.get('annotations', {}),
+ volumes=namespaced.get('volumes', []),
+ volume_mounts=namespaced.get('volume_mounts', []),
)
def as_dict(self):
@@ -97,6 +102,8 @@ def as_dict(self):
'node_selectors': self.node_selectors,
'affinity': self.affinity,
'annotations': self.annotations,
+ 'volumes': self.volumes,
+ 'volume_mounts': self.volume_mounts,
}
diff --git a/airflow/contrib/kubernetes/worker_configuration.py
b/airflow/contrib/kubernetes/worker_configuration.py
index 74658e384a..01298c5e60 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -186,6 +186,8 @@ def _construct_volume(name, claim):
def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id,
execution_date,
airflow_command, kube_executor_config):
volumes, volume_mounts = self.init_volumes_and_mounts()
+ volumes += kube_executor_config.volumes
+ volume_mounts += kube_executor_config.volume_mounts
worker_init_container_spec = self._get_init_containers(
copy.deepcopy(volume_mounts))
resources = Resources(
diff --git a/tests/contrib/minikube/test_kubernetes_executor.py
b/tests/contrib/minikube/test_kubernetes_executor.py
index 23bdfaec8d..7d3947799d 100644
--- a/tests/contrib/minikube/test_kubernetes_executor.py
+++ b/tests/contrib/minikube/test_kubernetes_executor.py
@@ -158,7 +158,7 @@ def start_dag(self, dag_id, host):
def test_integration_run_dag(self):
host = get_minikube_host()
- dag_id = 'example_kubernetes_annotation'
+ dag_id = 'example_kubernetes_executor_config'
result_json = self.start_dag(dag_id=dag_id, host=host)
@@ -181,7 +181,7 @@ def test_integration_run_dag(self):
def test_integration_run_dag_with_scheduler_failure(self):
host = get_minikube_host()
- dag_id = 'example_kubernetes_annotation'
+ dag_id = 'example_kubernetes_executor_config'
result_json = self.start_dag(dag_id=dag_id, host=host)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add volume mount to KubernetesExecutorConfig
> --------------------------------------------
>
> Key: AIRFLOW-3022
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3022
> Project: Apache Airflow
> Issue Type: Improvement
> Components: executor
> Affects Versions: 1.10.0
> Reporter: John Cheng
> Assignee: John Cheng
> Priority: Minor
> Fix For: 1.10.1
>
>
> Allows volume mount to the worker pod with executor_config.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)