jedcunningham commented on a change in pull request #19359:
URL: https://github.com/apache/airflow/pull/19359#discussion_r742106170



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):

Review comment:
       Might be nice to bring the comment from the tests here as well to 
explain why we might see these 2 responses.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, 
mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called
-            assert not kubernetes_executor.task_queue.empty()
 
-            # Disable the ApiException
-            mock_kube_client.create_namespaced_pod.side_effect = None
+            if should_requeue:
+                assert not kubernetes_executor.task_queue.empty()
 
-            # Execute the task without errors should empty the queue
-            kubernetes_executor.sync()
-            assert mock_kube_client.create_namespaced_pod.called
-            assert kubernetes_executor.task_queue.empty()
+                # Disable the ApiException
+                mock_kube_client.create_namespaced_pod.side_effect = None
+
+                # Execute the task without errors should empty the queue
+                kubernetes_executor.sync()

Review comment:
       ```suggestion
                   mock_kube_client.create_namespaced_pod.reset_mock()
                   kubernetes_executor.sync()
   ```
   
   We need to reset the mock, otherwise we aren't checking it was actually 
called again.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -225,24 +251,30 @@ def test_run_next_exception(self, mock_get_kube_client, 
mock_kubernetes_job_watc
             kubernetes_executor.start()
             # Execute a task while the Api Throws errors
             try_number = 1
+            task_instance_key = ('dag', 'task', 'run_id', try_number)
             kubernetes_executor.execute_async(
-                key=('dag', 'task', 'run_id', try_number),
+                key=task_instance_key,
                 queue=None,
                 command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
             )
             kubernetes_executor.sync()
             kubernetes_executor.sync()
 
             assert mock_kube_client.create_namespaced_pod.called

Review comment:
       ```suggestion
               mock_kube_client.create_namespaced_pod.assert_called_once()
   ```
   or
   ```suggestion
               assert mock_kube_client.create_namespaced_pod.call_count == 1
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,59 @@ def test_delete_pod_404_not_raised(self, mock_watcher, 
mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, 
body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package 
is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, should_requeue',
+        [
+            ('Forbidden', True),
+            ('fake-unhandled-reason', True),
+            ('Unprocessable Entity', False),
+            ('BadRequest', False),

Review comment:
       I think I'd prefer to see the status code here instead, particularly 
since we aren't using the `reason` for anything. Then we can also drop the map 
from inside the test body too.
   
   If we do need both, I think the mapping should happen here anyways.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. 
Failing task", e.reason)

Review comment:
       ```suggestion
                           self.log.error("Failing Task. Pod creation failed 
with reason: %r, message: %r", e.reason, json.loads(e.body)['message'],)
   ```
   
   Does it makes sense to include the message too? (warning, untested)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -599,13 +599,14 @@ def sync(self) -> None:
                 try:
                     self.kube_scheduler.run_next(task)
                 except ApiException as e:
-                    if e.reason == "BadRequest":
-                        self.log.error("Request was invalid. Failing task")
+                    if e.status in (400, 422):
+                        self.log.error("Pod creation failed with reason %r. 
Failing task", e.reason)

Review comment:
       Ah, gotcha, no that makes sense. Thanks.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -182,33 +182,52 @@ def test_delete_pod_404_not_raised(self, mock_watcher, 
mock_client, mock_kube_cl
         mock_delete_namespace.assert_called_with(pod_id, namespace, 
body=mock_client.V1DeleteOptions())
 
 
-class TestKubernetesExecutor(unittest.TestCase):
+class TestKubernetesExecutor:
     """
     Tests if an ApiException from the Kube Client will cause the task to
     be rescheduled.
     """
 
-    def setUp(self) -> None:
+    def setup_method(self) -> None:
         self.kubernetes_executor = KubernetesExecutor()
         self.kubernetes_executor.job_id = "5"
 
-    @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python 
package is not installed')
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason='kubernetes python package 
is not installed'
+    )
+    @pytest.mark.parametrize(
+        'reason, status, should_requeue',
+        [
+            ('Forbidden', 403, True),
+            ('fake-unhandled-reason', 12345, True),
+            ('Unprocessable Entity',422, False),
+            ('BadRequest',400, False),
+        ],
+    )
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
-    def test_run_next_exception(self, mock_get_kube_client, 
mock_kubernetes_job_watcher):
+    def test_run_next_exception_requeue(
+        self, mock_get_kube_client, mock_kubernetes_job_watcher, reason, 
status, should_requeue

Review comment:
       `reason` isn't used, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to