ephraimbuddy opened a new issue #17439:
URL: https://github.com/apache/airflow/issues/17439


   This test fails sometimes:
   
   ```
   ___________ TestLocalTaskJob.test_process_sigterm_works_with_retries 
___________
     
     self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 
0x7f3d380ae1f0>
     dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 
0x7f3d38917760>
     
         def test_process_sigterm_works_with_retries(self, dag_maker):
             """
             Test that ensures that task runner sets tasks to retry when 
they(task runner)
              receive sigterm
             """
             # use shared memory value so we can properly track value change 
even if
             # it's been updated across processes.
             retry_callback_called = Value('i', 0)
             task_terminated_externally = Value('i', 1)
             shared_mem_lock = Lock()
         
             def retry_callback(context):
                 with shared_mem_lock:
                     retry_callback_called.value += 1
                 assert context['dag_run'].dag_id == 'test_mark_failure_2'
         
             def task_function(ti):
                 time.sleep(60)
                 # This should not happen -- the state change should be noticed 
and the task should get killed
                 with shared_mem_lock:
                     task_terminated_externally.value = 0
         
             with dag_maker(dag_id='test_mark_failure_2'):
                 task = PythonOperator(
                     task_id='test_on_failure',
                     python_callable=task_function,
                     retries=1,
                     retry_delay=timedelta(seconds=2),
                     on_retry_callback=retry_callback,
                 )
             ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
             ti.refresh_from_db()
             job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, 
executor=SequentialExecutor())
             job1.task_runner = StandardTaskRunner(job1)
             job1.task_runner.start()
             settings.engine.dispose()
             process = multiprocessing.Process(target=job1.run)
             process.start()
             for _ in range(0, 25):
                 ti.refresh_from_db()
                 if ti.state == State.RUNNING and ti.pid is not None:
                     break
                 time.sleep(0.2)
             os.kill(process.pid, signal.SIGTERM)
             process.join(timeout=10)
             ti.refresh_from_db()
     >       assert ti.state == State.UP_FOR_RETRY
     E       AssertionError: assert None == <TaskInstanceState.UP_FOR_RETRY: 
'up_for_retry'>
     E        +  where None = <TaskInstance: 
test_mark_failure_2.test_on_failure 2016-01-01 00:00:00+00:00 [None]>.state
     E        +  and   <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> = 
State.UP_FOR_RETRY
     
     tests/jobs/test_local_task_job.py:828: AssertionError
     ----------------------------- Captured stdout call 
-----------------------------
     Running <TaskInstance: test_mark_failure_2.test_on_failure 
2016-01-01T00:00:00+00:00 [None]> on host 67aa517c450e
     ----------------------------- Captured stderr call 
-----------------------------
     Process Process-113:
     Traceback (most recent call last):
       File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
         self.dialect.do_execute(
       File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
         cursor.execute(statement, parameters)
       File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
206, in execute
         res = self._query(query)
       File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 
319, in _query
         db.query(q)
       File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py", 
line 259, in query
         _mysql.connection.query(self, query)
     MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 
'test_on_failure-test_mark_failure_2-2016-01-01 00:00:00.000000' for key 
'task_instance.PRIMARY'")
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to