jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742106170
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+ if e.status in (400, 422):
Review comment:
Might be nice to bring the comment from the tests here as well to
explain why we might see these 2 responses.
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watc
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
+ task_instance_key = ('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
- key=('dag', 'task', 'run_id', try_number),
+ key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
- assert not kubernetes_executor.task_queue.empty()
- # Disable the ApiException
- mock_kube_client.create_namespaced_pod.side_effect = None
+ if should_requeue:
+ assert not kubernetes_executor.task_queue.empty()
- # Execute the task without errors should empty the queue
- kubernetes_executor.sync()
- assert mock_kube_client.create_namespaced_pod.called
- assert kubernetes_executor.task_queue.empty()
+ # Disable the ApiException
+ mock_kube_client.create_namespaced_pod.side_effect = None
+
+ # Execute the task without errors should empty the queue
+ kubernetes_executor.sync()
Review comment:
```suggestion
mock_kube_client.create_namespaced_pod.reset_mock()
kubernetes_executor.sync()
```
We need to reset the mock, otherwise we aren't checking it was actually
called again.
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watc
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
+ task_instance_key = ('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
- key=('dag', 'task', 'run_id', try_number),
+ key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
Review comment:
```suggestion
mock_kube_client.create_namespaced_pod.assert_called_once()
```
or
```suggestion
assert mock_kube_client.create_namespaced_pod.call_count == 1
```
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, should_requeue',
+ [
+ ('Forbidden', True),
+ ('fake-unhandled-reason', True),
+ ('Unprocessable Entity', False),
+ ('BadRequest', False),
Review comment:
I think I'd prefer to see the status code here instead, particularly
since we aren't using the `reason` for anything. Then we can also drop the map
from inside the test body too.
If we do need both, I think the mapping should happen here anyways.
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+ if e.status in (400, 422):
+ self.log.error("Pod creation failed with reason %r.
Failing task", e.reason)
Review comment:
```suggestion
self.log.error("Failing Task. Pod creation failed
with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
```
Does it makes sense to include the message too? (warning, untested)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]