potiuk opened a new issue #17363:
URL: https://github.com/apache/airflow/issues/17363
The test is flaky and fails sometimes:
```
____________ TestLocalTaskJob.test_mark_success_on_success_callback
____________
self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at
0x7fc28c71edd0>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at
0x7fc28ce718d0>
def test_mark_success_on_success_callback(self, dag_maker):
"""
Test that ensures that where a task is marked success in the UI
on_success_callback gets executed
"""
# use shared memory value so we can properly track value change
even if
# it's been updated across processes.
success_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()
def success_callback(context):
with shared_mem_lock:
success_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_success'
def task_function(ti):
time.sleep(60)
# This should not happen -- the state change should be noticed
and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0
with dag_maker(dag_id='test_mark_success',
start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}):
task = PythonOperator(
task_id='test_state_succeeded1',
python_callable=task_function,
on_success_callback=success_callback,
)
session = settings.Session()
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True,
executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 25):
ti.refresh_from_db()
if ti.state == State.RUNNING:
break
time.sleep(0.2)
assert ti.state == State.RUNNING
ti.state = State.SUCCESS
session.merge(ti)
session.commit()
ti.refresh_from_db()
> process.join()
tests/jobs/test_local_task_job.py:563:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _
/usr/local/lib/python3.7/multiprocessing/process.py:140: in join
res = self._popen.wait(timeout)
/usr/local/lib/python3.7/multiprocessing/popen_fork.py:48: in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _
self = <multiprocessing.popen_fork.Popen object at 0x7fc28c996c50>, flag = 0
def poll(self, flag=os.WNOHANG):
if self.returncode is None:
try:
> pid, sts = os.waitpid(self.pid, flag)
E Failed: Timeout >60.0s
/usr/local/lib/python3.7/multiprocessing/popen_fork.py:28: Failed
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]