[
https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qian Yu updated AIRFLOW-6834:
-----------------------------
Description:
test_scheduler_job.py has a few flaky tests. Some are marked with
pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in
Travis.
Example 1:
{code:python}
============================================================ FAILURES
============================================================
_____________________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
______________________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
state = None, start_date = None, end_date = None
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
_______________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
_______________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42,
916304, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone
[UTC]>)
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
____________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
_____________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13,
26, 42, 916318, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone
[UTC]>)
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
======================================================== warnings summary
========================================================
/usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
/usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19:
DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will
be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import
ImmutableDict' instead.
from werkzeug import ImmutableDict
/usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
/usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5:
DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be
removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead.
from werkzeug import url_encode
-- Docs: https://docs.pytest.org/en/latest/warnings.html
==================================================== short test summary info
=====================================================
XFAIL
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
This test is failing!
XPASS
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
The test is flaky with nondeterministic result
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
=========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2
warnings in 61.44s (0:01:01) ============================
{code}
Example 2:
Run py.test like this in breeze. {{test_start_and_terminate_run_as_user}}
fails. This happens because {{test_start_and_terminate_run_as_user}} is
secretly relying on a call to {{settings.configure_orm()}} in
{{test_task_command.py}}. When {{test_task_command.py}} is not run,
{{test_start_and_terminate_run_as_user}} fails.
{code}
py.test --with-db-init tests/task/task_runner/test_standard_task_runner.py
XPASS
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
The test is flaky with nondeterministic result
XPASS
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
This test is failing!
ERROR
tests/task/task_runner/test_standard_task_runner.py::TestStandardTaskRunner::test_start_and_terminate_run_as_user
- sqlal...
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
{code}
was:
test_scheduler_job.py has a few flaky tests. Some are marked with
pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in
Travis. For example:
{code:python}
============================================================ FAILURES
============================================================
_____________________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
______________________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
state = None, start_date = None, end_date = None
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
_______________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
_______________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42,
916304, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone
[UTC]>)
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
____________
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
_____________
a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
@wraps(func)
def standalone_func(*a):
> return func(*(a + p.args), **p.kwargs)
/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13,
26, 42, 916318, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone
[UTC]>)
@parameterized.expand([
[State.NONE, None, None],
[State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
[State.UP_FOR_RESCHEDULE, timezone.utcnow() -
datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15)],
])
def test_dag_file_processor_process_task_instances_depends_on_past(self,
state, start_date, end_date):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
dag = DAG(
dag_id='test_scheduler_process_execute_task_depends_on_past',
start_date=DEFAULT_DATE,
default_args={
'depends_on_past': True,
},
)
dag_task1 = DummyOperator(
task_id='dummy1',
dag=dag,
owner='airflow')
dag_task2 = DummyOperator(
task_id='dummy2',
dag=dag,
owner='airflow')
with create_session() as session:
orm_dag = DagModel(dag_id=dag.dag_id)
session.merge(orm_dag)
dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dag.clear()
dr = dag_file_processor.create_dag_run(dag)
self.assertIsNotNone(dr)
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
ti_to_schedule = []
dag_file_processor._process_task_instances(dag,
task_instances_list=ti_to_schedule)
> assert ti_to_schedule == [
(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
(dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
]
E AssertionError: assert
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>),\n 1),\n ('test_scheduler_process_execute_task_depends_on_past',\n
'dummy1',\n datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),\n 1)] ==
[('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
E At index 0 diff:
('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00,
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E Full diff:
E [
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy2',
E ? ^
E + 'dummy1',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ('test_scheduler_process_execute_task_depends_on_past',
E - 'dummy1',
E ? ^
E + 'dummy2',
E ? ^
E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
GMT, +00:00:00, STD]>),
E ? ----
---------------------
E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E 1),
E ]
tests/jobs/test_scheduler_job.py:565: AssertionError
======================================================== warnings summary
========================================================
/usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
/usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19:
DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will
be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import
ImmutableDict' instead.
from werkzeug import ImmutableDict
/usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
/usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5:
DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be
removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead.
from werkzeug import url_encode
-- Docs: https://docs.pytest.org/en/latest/warnings.html
==================================================== short test summary info
=====================================================
XFAIL
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
This test is failing!
XPASS
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
The test is flaky with nondeterministic result
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
FAILED
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
=========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2
warnings in 61.44s (0:01:01) ============================
{code}
> Fix flaky test
> test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6834
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
> Project: Apache Airflow
> Issue Type: Bug
> Components: tests
> Affects Versions: 1.10.9
> Reporter: Qian Yu
> Assignee: Qian Yu
> Priority: Major
>
> test_scheduler_job.py has a few flaky tests. Some are marked with
> pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in
> Travis.
>
> Example 1:
>
> {code:python}
> ============================================================ FAILURES
> ============================================================
> _____________________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
> ______________________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
> state = None, start_date = None, end_date = None
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> _______________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> _______________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26,
> 42, 916304, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315,
> tzinfo=<Timezone [UTC]>)
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ____________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> _____________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13,
> 26, 42, 916318, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321,
> tzinfo=<Timezone [UTC]>)
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ======================================================== warnings summary
> ========================================================
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19:
> DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and
> will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import
> ImmutableDict' instead.
> from werkzeug import ImmutableDict
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5:
> DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will
> be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode'
> instead.
> from werkzeug import url_encode
> -- Docs: https://docs.pytest.org/en/latest/warnings.html
> ==================================================== short test summary info
> =====================================================
> XFAIL
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
> This test is failing!
> XPASS
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
> The test is flaky with nondeterministic result
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2
> warnings in 61.44s (0:01:01) ============================
> {code}
> Example 2:
> Run py.test like this in breeze. {{test_start_and_terminate_run_as_user}}
> fails. This happens because {{test_start_and_terminate_run_as_user}} is
> secretly relying on a call to {{settings.configure_orm()}} in
> {{test_task_command.py}}. When {{test_task_command.py}} is not run,
> {{test_start_and_terminate_run_as_user}} fails.
> {code}
> py.test --with-db-init tests/task/task_runner/test_standard_task_runner.py
> XPASS
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
> The test is flaky with nondeterministic result
> XPASS
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
> This test is failing!
> ERROR
> tests/task/task_runner/test_standard_task_runner.py::TestStandardTaskRunner::test_start_and_terminate_run_as_user
> - sqlal...
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)