eejbyfeldt commented on issue #16847: URL: https://github.com/apache/airflow/issues/16847#issuecomment-876958347
We are also running into this issue. I believe this is a bug that was introduced in this PR: https://github.com/apache/airflow/pull/15172 I [believe](url) the problematic code is this line: https://github.com/apache/airflow/blob/db6acd9e8a91e0eca9e12cace72edc57b2667d25/airflow/jobs/local_task_job.py#L82-L85 that sets task_instance state to `State.FAILED`. Here is a test case that can be added in `tests/jobs/test_local_task_job.py` that fails and shows the incorrect behavior ``` @pytest.mark.quarantined def test_process_sigterm_retries(self): """ Test that ensures that when a task is killed with sigterm follows the same retry logic as normal failures """ dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}) def task_function(ti): time.sleep(60) task = PythonOperator( task_id='test_on_failure', python_callable=task_function, retries=5, dag=dag, ) session = settings.Session() dag.clear() dag.create_dagrun( run_id="test", state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE, session=session, ) ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) ti.refresh_from_db() job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) settings.engine.dispose() process = multiprocessing.Process(target=job1.run) process.start() for _ in range(0, 20): ti.refresh_from_db() if ti.state == State.RUNNING and ti.pid is not None: break time.sleep(0.2) assert ti.pid is not None assert ti.state == State.RUNNING os.kill(process.pid, signal.SIGTERM) process.join(timeout=10) ti.refresh_from_db() assert ti.state == State.UP_FOR_RETRY ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
