[
https://issues.apache.org/jira/browse/AIRFLOW-2956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647526#comment-16647526
]
ASF GitHub Bot commented on AIRFLOW-2956:
-----------------------------------------
Fokko closed pull request #3806: [AIRFLOW-2956] added kubernetes tolerations to
kubernetes pod operator
URL: https://github.com/apache/incubator-airflow/pull/3806
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/example_dags/example_kubernetes_operator.py
b/airflow/contrib/example_dags/example_kubernetes_operator.py
index 4b3f54bd04..ba1f4433a7 100644
--- a/airflow/contrib/example_dags/example_kubernetes_operator.py
+++ b/airflow/contrib/example_dags/example_kubernetes_operator.py
@@ -38,6 +38,14 @@
default_args=args,
schedule_interval=None)
+ tolerations = [
+ {
+ 'key': "key",
+ 'operator': 'Equal',
+ 'value': 'value'
+ }
+ ]
+
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
@@ -49,7 +57,9 @@
task_id="task",
get_logs=True,
dag=dag,
- is_delete_operator_pod=False)
+ is_delete_operator_pod=False,
+ tolerations=tolerations
+ )
except ImportError as e:
log.warn("Could not import KubernetesPodOperator: " + str(e))
diff --git
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 97bcdf2abc..bc424d57ab 100644
---
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -186,3 +186,8 @@ def extract_image_pull_secrets(pod, req):
req['spec']['imagePullSecrets'] = [{
'name': pull_secret
} for pull_secret in pod.image_pull_secrets.split(',')]
+
+ @staticmethod
+ def extract_tolerations(pod, req):
+ if pod.tolerations:
+ req['spec']['tolerations'] = pod.tolerations
diff --git
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 877d7aafe2..059026b320 100644
---
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -60,6 +60,7 @@ def create(self, pod):
self.extract_annotations(pod, req)
self.extract_affinity(pod, req)
self.extract_hostnetwork(pod, req)
+ self.extract_tolerations(pod, req)
return req
@@ -118,4 +119,5 @@ def create(self, pod):
self.extract_annotations(pod, req)
self.extract_affinity(pod, req)
self.extract_hostnetwork(pod, req)
+ self.extract_tolerations(pod, req)
return req
diff --git a/airflow/contrib/kubernetes/pod.py
b/airflow/contrib/kubernetes/pod.py
index 221c8f4180..5de23ff5bd 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -78,7 +78,8 @@ def __init__(
resources=None,
annotations=None,
affinity=None,
- hostnetwork=False
+ hostnetwork=False,
+ tolerations=None,
):
self.image = image
self.envs = envs or {}
@@ -100,3 +101,4 @@ def __init__(
self.annotations = annotations or {}
self.affinity = affinity or {}
self.hostnetwork = hostnetwork or False
+ self.tolerations = tolerations or []
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index d3c396e668..d4f1013876 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -78,6 +78,8 @@ class KubernetesPodOperator(BaseOperator):
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type xcom_push: bool
+ :param tolerations: Kubernetes tolerations
+ :type list of tolerations
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
@@ -111,6 +113,7 @@ def execute(self, context):
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
+ pod.tolerations = self.tolerations
launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.xcom_push)
@@ -158,6 +161,7 @@ def __init__(self,
service_account_name="default",
is_delete_operator_pod=False,
hostnetwork=False,
+ tolerations=None,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -186,3 +190,4 @@ def __init__(self,
self.service_account_name = service_account_name
self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
+ self.tolerations = tolerations or []
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 8a4cea5a73..f05f22c445 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -82,6 +82,14 @@ Kubernetes Operator
}
}
+ tolerations = [
+ {
+ 'key': "key",
+ 'operator': 'Equal',
+ 'value': 'value'
+ }
+ ]
+
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
@@ -94,7 +102,8 @@ Kubernetes Operator
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
- hostnetwork=False
+ hostnetwork=False,
+ tolerations=tolerations
)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kubernetes tolerations for pod operator
> ---------------------------------------
>
> Key: AIRFLOW-2956
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2956
> Project: Apache Airflow
> Issue Type: Improvement
> Components: kubernetes
> Affects Versions: 1.10.0
> Reporter: Justin Holmes
> Assignee: Justin Holmes
> Priority: Minor
> Fix For: 1.10.1
>
>
> Allowing users to specify Kubernetes tolerations would be nice.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)