potiuk opened a new issue #15754:
URL: https://github.com/apache/airflow/issues/15754


   The test_mark_success_on_success_callback is flaky
   
   ```
   ____________ TestLocalTaskJob.test_mark_success_on_success_callback 
____________
     
     self = <tests.jobs.test_local_task_job.TestLocalTaskJob 
testMethod=test_mark_success_on_success_callback>
     
         def test_mark_success_on_success_callback(self):
             """
             Test that ensures that where a task is marked success in the UI
             on_success_callback gets executed
             """
             # use shared memory value so we can properly track value change 
even if
             # it's been updated across processes.
             success_callback_called = Value('i', 0)
             task_terminated_externally = Value('i', 1)
             shared_mem_lock = Lock()
         
             def success_callback(context):
                 with shared_mem_lock:
                     success_callback_called.value += 1
                 assert context['dag_run'].dag_id == 'test_mark_success'
         
             dag = DAG(dag_id='test_mark_success', start_date=DEFAULT_DATE, 
default_args={'owner': 'owner1'})
         
             def task_function(ti):
                 # pylint: disable=unused-argument
                 time.sleep(60)
                 # This should not happen -- the state change should be noticed 
and the task should get killed
                 with shared_mem_lock:
                     task_terminated_externally.value = 0
         
             task = PythonOperator(
                 task_id='test_state_succeeded1',
                 python_callable=task_function,
                 on_success_callback=success_callback,
                 dag=dag,
             )
         
             session = settings.Session()
         
             dag.clear()
             dag.create_dagrun(
                 run_id="test",
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 start_date=DEFAULT_DATE,
                 session=session,
             )
             ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
             ti.refresh_from_db()
             job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, 
executor=SequentialExecutor())
             job1.task_runner = StandardTaskRunner(job1)
         
             settings.engine.dispose()
             process = multiprocessing.Process(target=job1.run)
             process.start()
         
             for _ in range(0, 25):
                 ti.refresh_from_db()
                 if ti.state == State.RUNNING:
                     break
                 time.sleep(0.2)
             assert ti.state == State.RUNNING
             ti.state = State.SUCCESS
             session.merge(ti)
             session.commit()
         
             process.join(timeout=10)
     >       assert success_callback_called.value == 1
     E       AssertionError: assert 0 == 1
     E        +  where 0 = <Synchronized wrapper for c_int(0)>.value
     
     tests/jobs/test_local_task_job.py:499: AssertionError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to