[
https://issues.apache.org/jira/browse/AIRFLOW-2966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692998#comment-16692998
]
ASF GitHub Bot commented on AIRFLOW-2966:
-----------------------------------------
Fokko closed pull request #4209: [AIRFLOW-2966] Catch ApiException in the
Kubernetes Executor
URL: https://github.com/apache/incubator-airflow/pull/4209
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/config_templates/default_test.cfg
b/airflow/config_templates/default_test.cfg
index 6baec130b3..f0a467894d 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -126,3 +126,6 @@ hide_sensitive_variable_fields = True
elasticsearch_host =
elasticsearch_log_id_template =
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
elasticsearch_end_of_log_mark = end_of_log
+
+[kubernetes]
+dags_volume_claim = default
diff --git a/airflow/contrib/executors/kubernetes_executor.py
b/airflow/contrib/executors/kubernetes_executor.py
index e23ff96402..6c1bd222b9 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -607,8 +607,14 @@ def sync(self):
last_resource_version, session=self._session)
if not self.task_queue.empty():
- key, command, kube_executor_config = self.task_queue.get()
- self.kube_scheduler.run_next((key, command, kube_executor_config))
+ task = self.task_queue.get()
+
+ try:
+ self.kube_scheduler.run_next(task)
+ except ApiException:
+ self.log.exception('ApiException when attempting ' +
+ 'to run task, re-queueing.')
+ self.task_queue.put(task)
def _change_state(self, key, state, pod_id):
if state != State.RUNNING:
diff --git a/tests/contrib/executors/test_kubernetes_executor.py
b/tests/contrib/executors/test_kubernetes_executor.py
index c203e18d5c..1307e500cf 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -18,10 +18,13 @@
import re
import string
import random
+from urllib3 import HTTPResponse
from datetime import datetime
try:
+ from kubernetes.client.rest import ApiException
from airflow.contrib.executors.kubernetes_executor import
AirflowKubernetesScheduler
+ from airflow.contrib.executors.kubernetes_executor import
KubernetesExecutor
from airflow.contrib.kubernetes.worker_configuration import
WorkerConfiguration
except ImportError:
AirflowKubernetesScheduler = None
@@ -81,6 +84,7 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
Tests that if dags_volume_subpath/logs_volume_subpath configuration
options are passed to worker pod config
"""
+
def setUp(self):
if AirflowKubernetesScheduler is None:
self.skipTest("kubernetes python package is not installed")
@@ -152,5 +156,62 @@ def
test_worker_environment_when_dags_folder_specified(self):
self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+class TestKubernetesExecutor(unittest.TestCase):
+ """
+ Tests if an ApiException from the Kube Client will cause the task to
+ be rescheduled.
+ """
+ @unittest.skipIf(AirflowKubernetesScheduler is None,
+ 'kubernetes python package is not installed')
+
@mock.patch('airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher')
+
@mock.patch('airflow.contrib.executors.kubernetes_executor.get_kube_client')
+ def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+
+ # When a quota is exceeded this is the ApiException we get
+ r = HTTPResponse()
+ r.body = {
+ "kind": "Status",
+ "apiVersion": "v1",
+ "metadata": {},
+ "status": "Failure",
+ "message": "pods \"podname\" is forbidden: " +
+ "exceeded quota: compute-resources, " +
+ "requested: limits.memory=4Gi, " +
+ "used: limits.memory=6508Mi, " +
+ "limited: limits.memory=10Gi",
+ "reason": "Forbidden",
+ "details": {"name": "podname", "kind": "pods"},
+ "code": 403},
+ r.status = 403
+ r.reason = "Forbidden"
+
+ # A mock kube_client that throws errors when making a pod
+ mock_kube_client = mock.patch('kubernetes.client.CoreV1Api',
autospec=True)
+ mock_kube_client.create_namespaced_pod = mock.MagicMock(
+ side_effect=ApiException(http_resp=r))
+ mock_get_kube_client.return_value = mock_kube_client
+
+ kubernetesExecutor = KubernetesExecutor()
+ kubernetesExecutor.start()
+
+ # Execute a task while the Api Throws errors
+ try_number = 1
+ kubernetesExecutor.execute_async(key=('dag', 'task',
datetime.utcnow(), try_number),
+ command='command', executor_config={})
+ kubernetesExecutor.sync()
+ kubernetesExecutor.sync()
+
+ mock_kube_client.create_namespaced_pod.assert_called()
+ self.assertFalse(kubernetesExecutor.task_queue.empty())
+
+ # Disable the ApiException
+ mock_kube_client.create_namespaced_pod.side_effect = None
+
+ # Execute the task without errors should empty the queue
+ kubernetesExecutor.sync()
+ mock_kube_client.create_namespaced_pod.assert_called()
+ self.assertTrue(kubernetesExecutor.task_queue.empty())
+
+
if __name__ == '__main__':
unittest.main()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> KubernetesExecutor + namespace quotas kills scheduler if the pod can't be
> launched
> ----------------------------------------------------------------------------------
>
> Key: AIRFLOW-2966
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2966
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 2.0.0
> Environment: Kubernetes 1.9.8
> Reporter: John Hofman
> Assignee: John Hofman
> Priority: Major
> Fix For: 2.0.0
>
>
> When running Airflow in Kubernetes with the KubernetesExecutor and resource
> quota's set on the namespace Airflow is deployed in. If the scheduler tries
> to launch a pod into the namespace that exceeds the namespace limits it gets
> an ApiException, and crashes the scheduler.
> This stack trace is an example of the ApiException from the kubernetes client:
> {code:java}
> [2018-08-27 09:51:08,516] {pod_launcher.py:58} ERROR - Exception when
> attempting to create Namespaced Pod.
> Traceback (most recent call last):
> File "/src/apache-airflow/airflow/contrib/kubernetes/pod_launcher.py", line
> 55, in run_pod_async
> resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace)
> File
> "/usr/local/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py",
> line 6057, in create_namespaced_pod
> (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)
> File
> "/usr/local/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py",
> line 6142, in create_namespaced_pod_with_http_info
> collection_formats=collection_formats)
> File
> "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py",
> line 321, in call_api
> _return_http_data_only, collection_formats, _preload_content,
> _request_timeout)
> File
> "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py",
> line 155, in __call_api
> _request_timeout=_request_timeout)
> File
> "/usr/local/lib/python3.6/site-packages/kubernetes/client/api_client.py",
> line 364, in request
> body=body)
> File "/usr/local/lib/python3.6/site-packages/kubernetes/client/rest.py", line
> 266, in POST
> body=body)
> File "/usr/local/lib/python3.6/site-packages/kubernetes/client/rest.py", line
> 222, in request
> raise ApiException(http_resp=r)
> kubernetes.client.rest.ApiException: (403)
> Reason: Forbidden
> HTTP response headers: HTTPHeaderDict({'Audit-Id':
> 'b00e2cbb-bdb2-41f3-8090-824aee79448c', 'Content-Type': 'application/json',
> 'Date': 'Mon, 27 Aug 2018 09:51:08 GMT', 'Content-Length': '410'})
> HTTP response body:
> {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods
> \"podname-ec366e89ef934d91b2d3ffe96234a725\" is forbidden: exceeded quota:
> compute-resources, requested: limits.memory=4Gi, used: limits.memory=6508Mi,
> limited:
> limits.memory=10Gi","reason":"Forbidden","details":{"name":"podname-ec366e89ef934d91b2d3ffe96234a725","kind":"pods"},"code":403}{code}
>
> I would expect the scheduler to catch the Exception and at least mark the
> task as failed, or better yet retry the task later.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)