Fokko closed pull request #3960: [AIRFLOW-2966] Catch ApiException in the 
Kubernetes Executor
URL: https://github.com/apache/incubator-airflow/pull/3960
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index f9279cce54..2630a60ce4 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -125,3 +125,6 @@ hide_sensitive_variable_fields = True
 elasticsearch_host =
 elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
 elasticsearch_end_of_log_mark = end_of_log
+
+[kubernetes]
+dags_volume_claim = default
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index de1f9f4235..f9e350d303 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -599,8 +599,14 @@ def sync(self):
             last_resource_version, session=self._session)
 
         if not self.task_queue.empty():
-            key, command, kube_executor_config = self.task_queue.get()
-            self.kube_scheduler.run_next((key, command, kube_executor_config))
+            task = self.task_queue.get()
+
+            try:
+                self.kube_scheduler.run_next(task)
+            except ApiException:
+                self.log.exception('ApiException when attempting ' +
+                                   'to run task, re-queueing.')
+                self.task_queue.put(task)
 
     def _change_state(self, key, state, pod_id):
         if state != State.RUNNING:
diff --git a/tests/contrib/executors/test_kubernetes_executor.py 
b/tests/contrib/executors/test_kubernetes_executor.py
index c203e18d5c..905beeec40 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -18,10 +18,13 @@
 import re
 import string
 import random
+from urllib3 import HTTPResponse
 from datetime import datetime
 
 try:
+    from kubernetes.client.rest import ApiException
     from airflow.contrib.executors.kubernetes_executor import 
AirflowKubernetesScheduler
+    from airflow.contrib.executors.kubernetes_executor import 
KubernetesExecutor
     from airflow.contrib.kubernetes.worker_configuration import 
WorkerConfiguration
 except ImportError:
     AirflowKubernetesScheduler = None
@@ -81,6 +84,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
     Tests that if dags_volume_subpath/logs_volume_subpath configuration
     options are passed to worker pod config
     """
+
     def setUp(self):
         if AirflowKubernetesScheduler is None:
             self.skipTest("kubernetes python package is not installed")
@@ -152,5 +156,61 @@ def 
test_worker_environment_when_dags_folder_specified(self):
         self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
 
 
+class TestKubernetesExecutor(unittest.TestCase):
+    """
+    Tests if an ApiException from the Kube Client will cause the task to
+    be rescheduled.
+    """
+    @unittest.skipIf(AirflowKubernetesScheduler is None,
+                     'kubernetes python package is not installed')
+    
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
+    
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+    def test_run_next_exception(self, mock_get_kube_client, 
mock_kubernetes_job_watcher):
+
+        # When a quota is exceeded this is the ApiException we get
+        r = HTTPResponse()
+        r.body = {
+            "kind": "Status",
+            "apiVersion": "v1",
+            "metadata": {},
+            "status": "Failure",
+            "message": "pods \"podname\" is forbidden: " +
+            "exceeded quota: compute-resources, " +
+            "requested: limits.memory=4Gi, " +
+            "used: limits.memory=6508Mi, " +
+            "limited: limits.memory=10Gi",
+            "reason": "Forbidden",
+            "details": {"name": "podname", "kind": "pods"},
+            "code": 403},
+        r.status = 403
+        r.reason = "Forbidden"
+
+        # A mock kube_client that throws errors when making a pod
+        mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', 
autospec=True)
+        mock_kube_client.create_namespaced_pod = mock.MagicMock(
+            side_effect=ApiException(http_resp=r))
+        mock_get_kube_client.return_value = mock_kube_client
+
+        kubernetesExecutor = KubernetesExecutor()
+        kubernetesExecutor.start()
+
+        # Execute a task while the Api Throws errors
+        kubernetesExecutor.execute_async(key=('dag', 'task', 
datetime.utcnow()),
+                                         command='command', executor_config={})
+        kubernetesExecutor.sync()
+        kubernetesExecutor.sync()
+
+        mock_kube_client.create_namespaced_pod.assert_called()
+        self.assertFalse(kubernetesExecutor.task_queue.empty())
+
+        # Disable the ApiException
+        mock_kube_client.create_namespaced_pod.side_effect = None
+
+        # Execute the task without errors should empty the queue
+        kubernetesExecutor.sync()
+        mock_kube_client.create_namespaced_pod.assert_called()
+        self.assertTrue(kubernetesExecutor.task_queue.empty())
+
+
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to