[ 
https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qian Yu reassigned AIRFLOW-6834:
--------------------------------

    Assignee: Qian Yu

> Fix flaky test 
> test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6834
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: tests
>    Affects Versions: 1.10.9
>            Reporter: Qian Yu
>            Assignee: Qian Yu
>            Priority: Major
>
> test_scheduler_job.py has a few flaky tests. Some are marked with 
> pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in 
> Travis. For example:
>  
> {code:python}
> ============================================================ FAILURES 
> ============================================================
> _____________________ 
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
>  ______________________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
> state = None, start_date = None, end_date = None
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, 
> state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], 
> log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, 
> task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1)] == 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>), 1) != 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> _______________ 
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
>  _______________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 
> 42, 916304, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, 
> tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, 
> state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], 
> log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, 
> task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1)] == 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>), 1) != 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ____________ 
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
>  _____________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
>     @wraps(func)
>     def standalone_func(*a):
> >       return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 
> 26, 42, 916318, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, 
> tzinfo=<Timezone [UTC]>)
>     @parameterized.expand([
>         [State.NONE, None, None],
>         [State.UP_FOR_RETRY, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>         [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
> datetime.timedelta(minutes=30),
>          timezone.utcnow() - datetime.timedelta(minutes=15)],
>     ])
>     def test_dag_file_processor_process_task_instances_depends_on_past(self, 
> state, start_date, end_date):
>         """
>         Test if _process_task_instances puts the right task instances into the
>         mock_list.
>         """
>         dag = DAG(
>             dag_id='test_scheduler_process_execute_task_depends_on_past',
>             start_date=DEFAULT_DATE,
>             default_args={
>                 'depends_on_past': True,
>             },
>         )
>         dag_task1 = DummyOperator(
>             task_id='dummy1',
>             dag=dag,
>             owner='airflow')
>         dag_task2 = DummyOperator(
>             task_id='dummy2',
>             dag=dag,
>             owner='airflow')
>         with create_session() as session:
>             orm_dag = DagModel(dag_id=dag.dag_id)
>             session.merge(orm_dag)
>         dag_file_processor = DagFileProcessor(dag_ids=[], 
> log=mock.MagicMock())
>         dag.clear()
>         dr = dag_file_processor.create_dag_run(dag)
>         self.assertIsNotNone(dr)
>         with create_session() as session:
>             tis = dr.get_task_instances(session=session)
>             for ti in tis:
>                 ti.state = state
>                 ti.start_date = start_date
>                 ti.end_date = end_date
>         ti_to_schedule = []
>         dag_file_processor._process_task_instances(dag, 
> task_instances_list=ti_to_schedule)
> >       assert ti_to_schedule == [
>             (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
>             (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
>         ]
> E       AssertionError: assert 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>),\n  1)] == 
> [('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
> ('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
> E         At index 0 diff: 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, 
> +00:00:00, STD]>), 1) != 
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E         Full diff:
> E           [
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy2',
> E         ?         ^
> E         +   'dummy1',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E            ('test_scheduler_process_execute_task_depends_on_past',
> E         -   'dummy1',
> E         ?         ^
> E         +   'dummy2',
> E         ?         ^
> E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
> GMT, +00:00:00, STD]>),
> E         ?                                                       ----     
> ---------------------
> E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E             1),
> E           ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ======================================================== warnings summary 
> ========================================================
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
>   /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: 
> DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and 
> will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import 
> ImmutableDict' instead.
>     from werkzeug import ImmutableDict
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
>   /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: 
> DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will 
> be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' 
> instead.
>     from werkzeug import url_encode
> -- Docs: https://docs.pytest.org/en/latest/warnings.html
> ==================================================== short test summary info 
> =====================================================
> XFAIL 
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
>   This test is failing!
> XPASS 
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
>  The test is flaky with nondeterministic result
> FAILED 
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED 
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED 
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 
> warnings in 61.44s (0:01:01) ============================
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to