ephraimbuddy opened a new issue #17444:
URL: https://github.com/apache/airflow/issues/17444
This test is flaky and fails sometimes
```
____________ TestLocalTaskJob.test_task_sigkill_works_with_retries
_____________
self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at
0x7f3a8ec6c6a0>
dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at
0x7f3aa1490080>
def test_task_sigkill_works_with_retries(self, dag_maker):
"""
Test that ensures that tasks are retried when they receive sigkill
"""
# use shared memory value so we can properly track value change
even if
# it's been updated across processes.
retry_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()
def retry_callback(context):
with shared_mem_lock:
retry_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_failure_2'
def task_function(ti):
os.kill(os.getpid(), signal.SIGKILL)
# This should not happen -- the state change should be noticed
and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0
with dag_maker(
dag_id='test_mark_failure_2', start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'}
):
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=1,
retry_delay=timedelta(seconds=2),
on_retry_callback=retry_callback,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True,
executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.start()
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
time.sleep(0.4)
process.join(timeout=10)
ti.refresh_from_db()
> assert ti.state == State.UP_FOR_RETRY
E AssertionError: assert None == <TaskInstanceState.UP_FOR_RETRY:
'up_for_retry'>
E + where None = <TaskInstance:
test_mark_failure_2.test_on_failure 2016-01-01 00:00:00+00:00 [None]>.state
E + and <TaskInstanceState.UP_FOR_RETRY: 'up_for_retry'> =
State.UP_FOR_RETRY
tests/jobs/test_local_task_job.py:778: AssertionError
----------------------------- Captured stdout call
-----------------------------
Running <TaskInstance: test_mark_failure_2.test_on_failure
2016-01-01T00:00:00+00:00 [None]> on host 9a517fc8fb98
----------------------------- Captured stderr call
-----------------------------
Process Process-116:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py",
line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry
'test_on_failure-test_mark_failure_2-2016-01-01 00:00:00.000000' for key
'PRIMARY'")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]