potiuk opened a new issue, #45145: URL: https://github.com/apache/airflow/issues/45145
THE test_integration_run_dag_with_scheduler_failure is flaky and fails sometimes. For example https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306 ```python kubernetes_tests/test_other_executors.py .F [100%] =================================== FAILURES =================================== __ TestCeleryAndLocalExecutor.test_integration_run_dag_with_scheduler_failure __ self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70> @pytest.mark.xfail( EXECUTOR == "LocalExecutor", reason="https://github.com/apache/airflow/issues/44481 needs to be implemented", ) def test_integration_run_dag_with_scheduler_failure(self): dag_id = "example_xcom" dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host) self._delete_airflow_pod("scheduler") time.sleep(10) # give time for pod to restart # Wait some time for the operator to complete > self.monitor_task( host=self.host, dag_run_id=dag_run_id, dag_id=dag_id, task_id="push", expected_final_state="success", timeout=40, # This should fail fast if failing ) kubernetes_tests/test_other_executors.py:71: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70> host = 'localhost:41460' dag_run_id = 'manual__2024-12-22T08:49:39.[1302](https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306#step:10:1304)07+00:00', dag_id = 'example_xcom' task_id = 'push', expected_final_state = 'success', timeout = 40 def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, timeout): tries = 0 state = "" max_tries = max(int(timeout / 5), 1) # Wait some time for the operator to complete while tries < max_tries: time.sleep(5) # Check task state try: get_string = ( f"http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}" ) print(f"Calling [monitor_task]#1 {get_string}") result = self.session.get(get_string) if result.status_code == 404: check_call(["echo", "api returned 404."]) tries += 1 continue assert result.status_code == 200, "Could not get the status" result_json = result.json() print(f"Received [monitor_task]#2: {result_json}") state = result_json["state"] print(f"Attempt {tries}: Current state of operator is {state}") if state == expected_final_state: break if state in {"failed", "upstream_failed", "removed"}: # If the TI is in failed state (and that's not the state we want) there's no point # continuing to poll, it won't change break self._describe_resources(namespace="airflow") self._describe_resources(namespace="default") tries += 1 except requests.exceptions.ConnectionError as e: check_call(["echo", f"api call failed. trying again. error {e}"]) if state != expected_final_state: print(f"The expected state is wrong {state} != {expected_final_state} (expected)!") > assert state == expected_final_state E AssertionError: assert equals failed E None 'success' kubernetes_tests/test_base.py:197: AssertionError ---------------------------- Captured stdout setup ----------------------------- ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
