Fokko closed pull request #3806: [AIRFLOW-2956] added kubernetes tolerations to 
kubernetes pod operator
URL: https://github.com/apache/incubator-airflow/pull/3806
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_kubernetes_operator.py 
b/airflow/contrib/example_dags/example_kubernetes_operator.py
index 4b3f54bd04..ba1f4433a7 100644
--- a/airflow/contrib/example_dags/example_kubernetes_operator.py
+++ b/airflow/contrib/example_dags/example_kubernetes_operator.py
@@ -38,6 +38,14 @@
         default_args=args,
         schedule_interval=None)
 
+    tolerations = [
+        {
+            'key': "key",
+            'operator': 'Equal',
+            'value': 'value'
+        }
+    ]
+
     k = KubernetesPodOperator(
         namespace='default',
         image="ubuntu:16.04",
@@ -49,7 +57,9 @@
         task_id="task",
         get_logs=True,
         dag=dag,
-        is_delete_operator_pod=False)
+        is_delete_operator_pod=False,
+        tolerations=tolerations
+    )
 
 except ImportError as e:
     log.warn("Could not import KubernetesPodOperator: " + str(e))
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 97bcdf2abc..bc424d57ab 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -186,3 +186,8 @@ def extract_image_pull_secrets(pod, req):
             req['spec']['imagePullSecrets'] = [{
                 'name': pull_secret
             } for pull_secret in pod.image_pull_secrets.split(',')]
+
+    @staticmethod
+    def extract_tolerations(pod, req):
+        if pod.tolerations:
+            req['spec']['tolerations'] = pod.tolerations
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 877d7aafe2..059026b320 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -60,6 +60,7 @@ def create(self, pod):
         self.extract_annotations(pod, req)
         self.extract_affinity(pod, req)
         self.extract_hostnetwork(pod, req)
+        self.extract_tolerations(pod, req)
         return req
 
 
@@ -118,4 +119,5 @@ def create(self, pod):
         self.extract_annotations(pod, req)
         self.extract_affinity(pod, req)
         self.extract_hostnetwork(pod, req)
+        self.extract_tolerations(pod, req)
         return req
diff --git a/airflow/contrib/kubernetes/pod.py 
b/airflow/contrib/kubernetes/pod.py
index 221c8f4180..5de23ff5bd 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -78,7 +78,8 @@ def __init__(
             resources=None,
             annotations=None,
             affinity=None,
-            hostnetwork=False
+            hostnetwork=False,
+            tolerations=None,
     ):
         self.image = image
         self.envs = envs or {}
@@ -100,3 +101,4 @@ def __init__(
         self.annotations = annotations or {}
         self.affinity = affinity or {}
         self.hostnetwork = hostnetwork or False
+        self.tolerations = tolerations or []
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index d3c396e668..d4f1013876 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -78,6 +78,8 @@ class KubernetesPodOperator(BaseOperator):
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
     :type xcom_push: bool
+    :param tolerations: Kubernetes tolerations
+    :type list of tolerations
     """
     template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
@@ -111,6 +113,7 @@ def execute(self, context):
             pod.affinity = self.affinity
             pod.node_selectors = self.node_selectors
             pod.hostnetwork = self.hostnetwork
+            pod.tolerations = self.tolerations
 
             launcher = pod_launcher.PodLauncher(kube_client=client,
                                                 extract_xcom=self.xcom_push)
@@ -158,6 +161,7 @@ def __init__(self,
                  service_account_name="default",
                  is_delete_operator_pod=False,
                  hostnetwork=False,
+                 tolerations=None,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -186,3 +190,4 @@ def __init__(self,
         self.service_account_name = service_account_name
         self.is_delete_operator_pod = is_delete_operator_pod
         self.hostnetwork = hostnetwork
+        self.tolerations = tolerations or []
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index 8a4cea5a73..f05f22c445 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -82,6 +82,14 @@ Kubernetes Operator
         }
     }
 
+    tolerations = [
+        {
+            'key': "key",
+            'operator': 'Equal',
+            'value': 'value'
+         }
+    ]
+
     k = KubernetesPodOperator(namespace='default',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
@@ -94,7 +102,8 @@ Kubernetes Operator
                               task_id="task",
                               affinity=affinity,
                               is_delete_operator_pod=True,
-                              hostnetwork=False
+                              hostnetwork=False,
+                              tolerations=tolerations
                               )
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to