dstandish commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r741641790
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+ if e.reason in ("BadRequest", "Unprocessable Entity"):
+ self.log.error(f"Pod creation failed with reason
{e.reason!r}. Failing task")
Review comment:
yeah i can see that point. another reason i went with `reason` was ...
well... the code was already using `reason` for `BadRequest`. And you know it
occurred to me the possibility that the status codes have less fidelity than
the reasons, i.e. possibly less precise, so that if we were to change this to
status code, perhaps we would catch more scenarios in this block than was
originally intended. e.g if 400 can be `BadRequest` and `LessBadRequest`, we
might start catching something we weren't supposed to.
but, i've seen no evidence of this, and since the majority seems to be in
favor of status code, i'll make the change.
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+ if e.reason in ("BadRequest", "Unprocessable Entity"):
+ self.log.error(f"Pod creation failed with reason
{e.reason!r}. Failing task")
Review comment:
and i've updated it, ptal
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, should_requeue',
+ [
+ ('Forbidden', True),
+ ('fake-unhandled-reason', True),
+ ('Unprocessable Entity', False),
+ ('BadRequest', False),
Review comment:
yeah that's the right call.
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
try:
self.kube_scheduler.run_next(task)
except ApiException as e:
- if e.reason == "BadRequest":
- self.log.error("Request was invalid. Failing task")
+ if e.status in (400, 422):
+ self.log.error("Pod creation failed with reason %r.
Failing task", e.reason)
Review comment:
in this case, the exception is already logged within `run_pod_async`,
which catches, logs, and reraises.
probably anything that is going to immediately reraise shouldn't bother
logging (so that the logging is controlled by whatever is handling the
exception). but in this case, that's what the code does so it is probably
better to only log once, because it can be a bit verbose. lmkyt
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, status, should_requeue',
+ [
+ ('Forbidden', 403, True),
+ ('fake-unhandled-reason', 12345, True),
+ ('Unprocessable Entity',422, False),
+ ('BadRequest',400, False),
+ ],
+ )
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
- def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+ def test_run_next_exception_requeue(
+ self, mock_get_kube_client, mock_kubernetes_job_watcher, reason,
status, should_requeue
Review comment:
it provides a human-friendly test name. this is a pattern i've seen in
other airflow tests, though perhaps more commonly the param would be called
`name`. wdyt?
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, status, should_requeue',
+ [
+ ('Forbidden', 403, True),
+ ('fake-unhandled-reason', 12345, True),
+ ('Unprocessable Entity',422, False),
+ ('BadRequest',400, False),
+ ],
+ )
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
- def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+ def test_run_next_exception_requeue(
+ self, mock_get_kube_client, mock_kubernetes_job_watcher, reason,
status, should_requeue
Review comment:
e.g.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, status, should_requeue',
+ [
+ ('Forbidden', 403, True),
+ ('fake-unhandled-reason', 12345, True),
+ ('Unprocessable Entity',422, False),
+ ('BadRequest',400, False),
+ ],
+ )
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
- def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+ def test_run_next_exception_requeue(
+ self, mock_get_kube_client, mock_kubernetes_job_watcher, reason,
status, should_requeue
Review comment:
though maybe this approach is better 🤷:

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher,
mock_client, mock_kube_cl
mock_delete_namespace.assert_called_with(pod_id, namespace,
body=mock_client.V1DeleteOptions())
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
"""
Tests if an ApiException from the Kube Client will cause the task to
be rescheduled.
"""
- def setUp(self) -> None:
+ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = "5"
- @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python
package is not installed')
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason='kubernetes python package
is not installed'
+ )
+ @pytest.mark.parametrize(
+ 'reason, status, should_requeue',
+ [
+ ('Forbidden', 403, True),
+ ('fake-unhandled-reason', 12345, True),
+ ('Unprocessable Entity',422, False),
+ ('BadRequest',400, False),
+ ],
+ )
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
- def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watcher):
+ def test_run_next_exception_requeue(
+ self, mock_get_kube_client, mock_kubernetes_job_watcher, reason,
status, should_requeue
Review comment:
ok updated
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client,
mock_kubernetes_job_watc
kubernetes_executor.start()
# Execute a task while the Api Throws errors
try_number = 1
+ task_instance_key = ('dag', 'task', 'run_id', try_number)
kubernetes_executor.execute_async(
- key=('dag', 'task', 'run_id', try_number),
+ key=task_instance_key,
queue=None,
command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
)
kubernetes_executor.sync()
kubernetes_executor.sync()
assert mock_kube_client.create_namespaced_pod.called
Review comment:
yeah it's odd...
the test actually calls sync 2 times prior to this line
so depending on the scenario call count could be 2.
but i will remove one of the sync calls and assert == 1... it seems that
must have been a mistake.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]