eejbyfeldt edited a comment on issue #16847:
URL: https://github.com/apache/airflow/issues/16847#issuecomment-876958347


   We are also running into this issue. I believe this is a bug that was 
introduced in this PR: https://github.com/apache/airflow/pull/15172 meaning 
that 2.1.0 and 2.1.1 should be affected.
   
   I [believe](url) the problematic code is this line:
   
https://github.com/apache/airflow/blob/db6acd9e8a91e0eca9e12cace72edc57b2667d25/airflow/jobs/local_task_job.py#L82-L85
   that sets task_instance state to `State.FAILED`.
   
   Here is a test case that can be added in `tests/jobs/test_local_task_job.py` 
that fails and shows the incorrect behavior
   ```
       @pytest.mark.quarantined
       def test_process_sigterm_retries(self):
           """
           Test that ensures that when a task is killed with sigterm follows the
           same retry logic as normal failures
           """
           dag = DAG(dag_id='test_mark_failure', start_date=DEFAULT_DATE, 
default_args={'owner': 'owner1'})
   
           def task_function(ti):
               time.sleep(60)
   
           task = PythonOperator(
               task_id='test_on_failure',
               python_callable=task_function,
               retries=5,
               dag=dag,
           )
   
           session = settings.Session()
   
           dag.clear()
           dag.create_dagrun(
               run_id="test",
               state=State.RUNNING,
               execution_date=DEFAULT_DATE,
               start_date=DEFAULT_DATE,
               session=session,
           )
           ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
           ti.refresh_from_db()
           job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, 
executor=SequentialExecutor())
   
           settings.engine.dispose()
           process = multiprocessing.Process(target=job1.run)
           process.start()
   
           for _ in range(0, 20):
               ti.refresh_from_db()
               if ti.state == State.RUNNING and ti.pid is not None:
                   break
               time.sleep(0.2)
           assert ti.pid is not None
           assert ti.state == State.RUNNING
           os.kill(process.pid, signal.SIGTERM)
           process.join(timeout=10)
           ti.refresh_from_db()
           assert ti.state == State.UP_FOR_RETRY
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to