potiuk opened a new issue, #45145:
URL: https://github.com/apache/airflow/issues/45145

   THE test_integration_run_dag_with_scheduler_failure  is flaky and fails 
sometimes.
   
   For example 
https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306
   
   ```python
   kubernetes_tests/test_other_executors.py .F                              
[100%]
     
     =================================== FAILURES 
===================================
     __ 
TestCeleryAndLocalExecutor.test_integration_run_dag_with_scheduler_failure __
     
     self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor 
object at 0x7f7c8d269f70>
     
         @pytest.mark.xfail(
             EXECUTOR == "LocalExecutor",
             reason="https://github.com/apache/airflow/issues/44481 needs to be 
implemented",
         )
         def test_integration_run_dag_with_scheduler_failure(self):
             dag_id = "example_xcom"
         
             dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, 
self.host)
         
             self._delete_airflow_pod("scheduler")
         
             time.sleep(10)  # give time for pod to restart
         
             # Wait some time for the operator to complete
     >       self.monitor_task(
                 host=self.host,
                 dag_run_id=dag_run_id,
                 dag_id=dag_id,
                 task_id="push",
                 expected_final_state="success",
                 timeout=40,  # This should fail fast if failing
             )
     
     kubernetes_tests/test_other_executors.py:71: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor 
object at 0x7f7c8d269f70>
     host = 'localhost:41460'
     dag_run_id = 
'manual__2024-12-22T08:49:39.[1302](https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306#step:10:1304)07+00:00',
 dag_id = 'example_xcom'
     task_id = 'push', expected_final_state = 'success', timeout = 40
     
         def monitor_task(self, host, dag_run_id, dag_id, task_id, 
expected_final_state, timeout):
             tries = 0
             state = ""
             max_tries = max(int(timeout / 5), 1)
             # Wait some time for the operator to complete
             while tries < max_tries:
                 time.sleep(5)
                 # Check task state
                 try:
                     get_string = (
                         
f"http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}";
                     )
                     print(f"Calling [monitor_task]#1 {get_string}")
                     result = self.session.get(get_string)
                     if result.status_code == 404:
                         check_call(["echo", "api returned 404."])
                         tries += 1
                         continue
                     assert result.status_code == 200, "Could not get the 
status"
                     result_json = result.json()
                     print(f"Received [monitor_task]#2: {result_json}")
                     state = result_json["state"]
                     print(f"Attempt {tries}: Current state of operator is 
{state}")
         
                     if state == expected_final_state:
                         break
                     if state in {"failed", "upstream_failed", "removed"}:
                         # If the TI is in failed state (and that's not the 
state we want) there's no point
                         # continuing to poll, it won't change
                         break
                     self._describe_resources(namespace="airflow")
                     self._describe_resources(namespace="default")
                     tries += 1
                 except requests.exceptions.ConnectionError as e:
                     check_call(["echo", f"api call failed. trying again. error 
{e}"])
             if state != expected_final_state:
                 print(f"The expected state is wrong {state} != 
{expected_final_state} (expected)!")
     >       assert state == expected_final_state
     E       AssertionError: assert equals failed
     E         None       'success'
     
     kubernetes_tests/test_base.py:197: AssertionError
     ---------------------------- Captured stdout setup 
-----------------------------
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to