ephraimbuddy opened a new issue #17439:
URL: https://github.com/apache/airflow/issues/17439
This test fails sometimes:
```
___________ TestLocalTaskJob.test_process_sigterm_works_with_retries
___________
self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at
0x7f3d380ae1f0>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at
0x7f3d38917760>
def test_process_sigterm_works_with_retries(self, dag_maker):
"""
Test that ensures that task runner sets tasks to retry when
they(task runner)
receive sigterm
"""
# use shared memory value so we can properly track value change
even if
# it's been updated across processes.
retry_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()
def retry_callback(context):
with shared_mem_lock:
retry_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_failure_2'
def task_function(ti):
time.sleep(60)
# This should not happen -- the state change should be noticed
and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0
with dag_maker(dag_id='test_mark_failure_2'):
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=1,
retry_delay=timedelta(seconds=2),
on_retry_callback=retry_callback,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True,
executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.start()
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 25):
ti.refresh_from_db()
if ti.state == State.RUNNING and ti.pid is not None:
break
time.sleep(0.2)
os.kill(process.pid, signal.SIGTERM)
process.join(timeout=10)
ti.refresh_from_db()
> assert ti.state == State.UP_FOR_RETRY
E AssertionError: assert None == <TaskInstanceState.UP_FOR_RETRY:
'up_for_retry'>
E + where None = <TaskInstance:
test_mark_failure_2.test_on_failure 2016-01-01 00:00:00+00:00 [None]>.state
E + and <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> =
State.UP_FOR_RETRY
tests/jobs/test_local_task_job.py:828: AssertionError
----------------------------- Captured stdout call
-----------------------------
Running <TaskInstance: test_mark_failure_2.test_on_failure
2016-01-01T00:00:00+00:00 [None]> on host 67aa517c450e
----------------------------- Captured stderr call
-----------------------------
Process Process-113:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1276, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py",
line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry
'test_on_failure-test_mark_failure_2-2016-01-01 00:00:00.000000' for key
'task_instance.PRIMARY'")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]