potiuk opened a new issue #17363:
URL: https://github.com/apache/airflow/issues/17363


   The test is flaky  and fails sometimes:
   
   ```
     ____________ TestLocalTaskJob.test_mark_success_on_success_callback 
____________
     
     self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 
0x7fc28c71edd0>
     dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 
0x7fc28ce718d0>
     
         def test_mark_success_on_success_callback(self, dag_maker):
             """
             Test that ensures that where a task is marked success in the UI
             on_success_callback gets executed
             """
             # use shared memory value so we can properly track value change 
even if
             # it's been updated across processes.
             success_callback_called = Value('i', 0)
             task_terminated_externally = Value('i', 1)
             shared_mem_lock = Lock()
         
             def success_callback(context):
                 with shared_mem_lock:
                     success_callback_called.value += 1
         
                 assert context['dag_run'].dag_id == 'test_mark_success'
         
             def task_function(ti):
                 time.sleep(60)
         
                 # This should not happen -- the state change should be noticed 
and the task should get killed
                 with shared_mem_lock:
                     task_terminated_externally.value = 0
         
             with dag_maker(dag_id='test_mark_success', 
start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}):
                 task = PythonOperator(
                     task_id='test_state_succeeded1',
                     python_callable=task_function,
                     on_success_callback=success_callback,
                 )
         
             session = settings.Session()
         
             ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
             ti.refresh_from_db()
             job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, 
executor=SequentialExecutor())
             job1.task_runner = StandardTaskRunner(job1)
         
             settings.engine.dispose()
             process = multiprocessing.Process(target=job1.run)
             process.start()
         
             for _ in range(0, 25):
                 ti.refresh_from_db()
                 if ti.state == State.RUNNING:
                     break
                 time.sleep(0.2)
             assert ti.state == State.RUNNING
             ti.state = State.SUCCESS
             session.merge(ti)
             session.commit()
             ti.refresh_from_db()
     >       process.join()
     
     tests/jobs/test_local_task_job.py:563: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     /usr/local/lib/python3.7/multiprocessing/process.py:140: in join
         res = self._popen.wait(timeout)
     /usr/local/lib/python3.7/multiprocessing/popen_fork.py:48: in wait
         return self.poll(os.WNOHANG if timeout == 0.0 else 0)
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <multiprocessing.popen_fork.Popen object at 0x7fc28c996c50>, flag = 0
     
         def poll(self, flag=os.WNOHANG):
             if self.returncode is None:
                 try:
     >               pid, sts = os.waitpid(self.pid, flag)
     E               Failed: Timeout >60.0s
     
     /usr/local/lib/python3.7/multiprocessing/popen_fork.py:28: Failed
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to