Fokko closed pull request #3960: [AIRFLOW-2966] Catch ApiException in the
Kubernetes Executor
URL: https://github.com/apache/incubator-airflow/pull/3960
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/config_templates/default_test.cfg
b/airflow/config_templates/default_test.cfg
index f9279cce54..2630a60ce4 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -125,3 +125,6 @@ hide_sensitive_variable_fields = True
elasticsearch_host =
elasticsearch_log_id_template =
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log
+
+[kubernetes]
+dags_volume_claim = default
diff --git a/airflow/contrib/executors/kubernetes_executor.py
b/airflow/contrib/executors/kubernetes_executor.py
index de1f9f4235..f9e350d303 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -599,8 +599,14 @@ def sync(self):
last_resource_version, session=self._session)
if not self.task_queue.empty():
- key, command, kube_executor_config = self.task_queue.get()
- self.kube_scheduler.run_next((key, command, kube_executor_config))
+ task = self.task_queue.get()
+
+ try:
+ self.kube_scheduler.run_next(task)
+ except ApiException:
+ self.log.exception('ApiException when attempting ' +
+ 'to run task, re-queueing.')
+ self.task_queue.put(task)
def _change_state(self, key, state, pod_id):
if state != State.RUNNING:
diff --git a/tests/contrib/executors/test_kubernetes_executor.py
b/tests/contrib/executors/test_kubernetes_executor.py
index c203e18d5c..905beeec40 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -18,10 +18,13 @@
import re
import string
import random
+from urllib3 import HTTPResponse
from datetime import datetime
try:
+ from kubernetes.client.rest import ApiException
from airflow.contrib.executors.kubernetes_executor import
AirflowKubernetesScheduler
+ from airflow.contrib.executors.kubernetes_executor import
KubernetesExecutor
from airflow.contrib.kubernetes.worker_configuration import
WorkerConfiguration
except ImportError:
AirflowKubernetesScheduler = None
@@ -81,6 +84,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
Tests that if dags_volume_subpath/logs_volume_subpath configuration
options are passed to worker pod config
"""
+
def setUp(self):
if AirflowKubernetesScheduler is None:
self.skipTest("kubernetes python package is not installed")
@@ -152,5 +156,61 @@ def
test_worker_environment_when_dags_folder_specified(self):
self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+class TestKubernetesExecutor(unittest.TestCase):
+ """
+ Tests if an ApiException from the Kube Client will cause the task to
+ be rescheduled.
+ """
+ @unittest.skipIf(AirflowKubernetesScheduler is None,
+ 'kubernetes python package is not installed')
+
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
+
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+ def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+
+ # When a quota is exceeded this is the ApiException we get
+ r = HTTPResponse()
+ r.body = {
+ "kind": "Status",
+ "apiVersion": "v1",
+ "metadata": {},
+ "status": "Failure",
+ "message": "pods \"podname\" is forbidden: " +
+ "exceeded quota: compute-resources, " +
+ "requested: limits.memory=4Gi, " +
+ "used: limits.memory=6508Mi, " +
+ "limited: limits.memory=10Gi",
+ "reason": "Forbidden",
+ "details": {"name": "podname", "kind": "pods"},
+ "code": 403},
+ r.status = 403
+ r.reason = "Forbidden"
+
+ # A mock kube_client that throws errors when making a pod
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api',
autospec=True)
+ mock_kube_client.create_namespaced_pod = mock.MagicMock(
+ side_effect=ApiException(http_resp=r))
+ mock_get_kube_client.return_value = mock_kube_client
+
+ kubernetesExecutor = KubernetesExecutor()
+ kubernetesExecutor.start()
+
+ # Execute a task while the Api Throws errors
+ kubernetesExecutor.execute_async(key=('dag', 'task',
datetime.utcnow()),
+ command='command', executor_config={})
+ kubernetesExecutor.sync()
+ kubernetesExecutor.sync()
+
+ mock_kube_client.create_namespaced_pod.assert_called()
+ self.assertFalse(kubernetesExecutor.task_queue.empty())
+
+ # Disable the ApiException
+ mock_kube_client.create_namespaced_pod.side_effect = None
+
+ # Execute the task without errors should empty the queue
+ kubernetesExecutor.sync()
+ mock_kube_client.create_namespaced_pod.assert_called()
+ self.assertTrue(kubernetesExecutor.task_queue.empty())
+
+
if __name__ == '__main__':
unittest.main()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services