potiuk opened a new issue #15754:
URL: https://github.com/apache/airflow/issues/15754
The test_mark_success_on_success_callback is flaky
```
____________ TestLocalTaskJob.test_mark_success_on_success_callback
____________
self = <tests.jobs.test_local_task_job.TestLocalTaskJob
testMethod=test_mark_success_on_success_callback>
def test_mark_success_on_success_callback(self):
"""
Test that ensures that where a task is marked success in the UI
on_success_callback gets executed
"""
# use shared memory value so we can properly track value change
even if
# it's been updated across processes.
success_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()
def success_callback(context):
with shared_mem_lock:
success_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_success'
dag = DAG(dag_id='test_mark_success', start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})
def task_function(ti):
# pylint: disable=unused-argument
time.sleep(60)
# This should not happen -- the state change should be noticed
and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0
task = PythonOperator(
task_id='test_state_succeeded1',
python_callable=task_function,
on_success_callback=success_callback,
dag=dag,
)
session = settings.Session()
dag.clear()
dag.create_dagrun(
run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True,
executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 25):
ti.refresh_from_db()
if ti.state == State.RUNNING:
break
time.sleep(0.2)
assert ti.state == State.RUNNING
ti.state = State.SUCCESS
session.merge(ti)
session.commit()
process.join(timeout=10)
> assert success_callback_called.value == 1
E AssertionError: assert 0 == 1
E + where 0 = <Synchronized wrapper for c_int(0)>.value
tests/jobs/test_local_task_job.py:499: AssertionError
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]