Qian Yu created AIRFLOW-6834:
--------------------------------

             Summary: Fix flaky test 
test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
                 Key: AIRFLOW-6834
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
             Project: Apache Airflow
          Issue Type: Bug
          Components: tests
    Affects Versions: 1.10.9
            Reporter: Qian Yu


test_scheduler_job.py has a few flaky tests. Some are marked with 
pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in 
Travis. For example:

 
{code:python}
============================================================ FAILURES 
============================================================
_____________________ 
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
 ______________________

a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)

    @wraps(func)
    def standalone_func(*a):
>       return func(*(a + p.args), **p.kwargs)

/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
state = None, start_date = None, end_date = None

    @parameterized.expand([
        [State.NONE, None, None],
        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
    ])
    def test_dag_file_processor_process_task_instances_depends_on_past(self, 
state, start_date, end_date):
        """
        Test if _process_task_instances puts the right task instances into the
        mock_list.
        """
        dag = DAG(
            dag_id='test_scheduler_process_execute_task_depends_on_past',
            start_date=DEFAULT_DATE,
            default_args={
                'depends_on_past': True,
            },
        )
        dag_task1 = DummyOperator(
            task_id='dummy1',
            dag=dag,
            owner='airflow')
        dag_task2 = DummyOperator(
            task_id='dummy2',
            dag=dag,
            owner='airflow')

        with create_session() as session:
            orm_dag = DagModel(dag_id=dag.dag_id)
            session.merge(orm_dag)

        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
        dag.clear()
        dr = dag_file_processor.create_dag_run(dag)
        self.assertIsNotNone(dr)

        with create_session() as session:
            tis = dr.get_task_instances(session=session)
            for ti in tis:
                ti.state = state
                ti.start_date = start_date
                ti.end_date = end_date

        ti_to_schedule = []
        dag_file_processor._process_task_instances(dag, 
task_instances_list=ti_to_schedule)

>       assert ti_to_schedule == [
            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
        ]
E       AssertionError: assert 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  
'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),\n  1)] == 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
E         At index 0 diff: 
('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E         Full diff:
E           [
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy2',
E         ?         ^
E         +   'dummy1',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy1',
E         ?         ^
E         +   'dummy2',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E           ]

tests/jobs/test_scheduler_job.py:565: AssertionError
_______________ 
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
 _______________

a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)

    @wraps(func)
    def standalone_func(*a):
>       return func(*(a + p.args), **p.kwargs)

/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26, 42, 
916304, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315, tzinfo=<Timezone 
[UTC]>)

    @parameterized.expand([
        [State.NONE, None, None],
        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
    ])
    def test_dag_file_processor_process_task_instances_depends_on_past(self, 
state, start_date, end_date):
        """
        Test if _process_task_instances puts the right task instances into the
        mock_list.
        """
        dag = DAG(
            dag_id='test_scheduler_process_execute_task_depends_on_past',
            start_date=DEFAULT_DATE,
            default_args={
                'depends_on_past': True,
            },
        )
        dag_task1 = DummyOperator(
            task_id='dummy1',
            dag=dag,
            owner='airflow')
        dag_task2 = DummyOperator(
            task_id='dummy2',
            dag=dag,
            owner='airflow')

        with create_session() as session:
            orm_dag = DagModel(dag_id=dag.dag_id)
            session.merge(orm_dag)

        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
        dag.clear()
        dr = dag_file_processor.create_dag_run(dag)
        self.assertIsNotNone(dr)

        with create_session() as session:
            tis = dr.get_task_instances(session=session)
            for ti in tis:
                ti.state = state
                ti.start_date = start_date
                ti.end_date = end_date

        ti_to_schedule = []
        dag_file_processor._process_task_instances(dag, 
task_instances_list=ti_to_schedule)

>       assert ti_to_schedule == [
            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
        ]
E       AssertionError: assert 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  
'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),\n  1)] == 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
E         At index 0 diff: 
('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E         Full diff:
E           [
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy2',
E         ?         ^
E         +   'dummy1',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy1',
E         ?         ^
E         +   'dummy2',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E           ]

tests/jobs/test_scheduler_job.py:565: AssertionError
____________ 
TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
 _____________

a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)

    @wraps(func)
    def standalone_func(*a):
>       return func(*(a + p.args), **p.kwargs)

/usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tests.jobs.test_scheduler_job.TestDagFileProcessor 
testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13, 
26, 42, 916318, tzinfo=<Timezone [UTC]>)
end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321, tzinfo=<Timezone 
[UTC]>)

    @parameterized.expand([
        [State.NONE, None, None],
        [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
        [State.UP_FOR_RESCHEDULE, timezone.utcnow() - 
datetime.timedelta(minutes=30),
         timezone.utcnow() - datetime.timedelta(minutes=15)],
    ])
    def test_dag_file_processor_process_task_instances_depends_on_past(self, 
state, start_date, end_date):
        """
        Test if _process_task_instances puts the right task instances into the
        mock_list.
        """
        dag = DAG(
            dag_id='test_scheduler_process_execute_task_depends_on_past',
            start_date=DEFAULT_DATE,
            default_args={
                'depends_on_past': True,
            },
        )
        dag_task1 = DummyOperator(
            task_id='dummy1',
            dag=dag,
            owner='airflow')
        dag_task2 = DummyOperator(
            task_id='dummy2',
            dag=dag,
            owner='airflow')

        with create_session() as session:
            orm_dag = DagModel(dag_id=dag.dag_id)
            session.merge(orm_dag)

        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
        dag.clear()
        dr = dag_file_processor.create_dag_run(dag)
        self.assertIsNotNone(dr)

        with create_session() as session:
            tis = dr.get_task_instances(session=session)
            for ti in tis:
                ti.state = state
                ti.start_date = start_date
                ti.end_date = end_date

        ti_to_schedule = []
        dag_file_processor._process_task_instances(dag, 
task_instances_list=ti_to_schedule)

>       assert ti_to_schedule == [
            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
            (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
        ]
E       AssertionError: assert 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>),\n  1),\n ('test_scheduler_process_execute_task_depends_on_past',\n  
'dummy1',\n  datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),\n  1)] == 
[('test_scheduler_process_execute_task_depends_on_past',\n  'dummy1',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1),\n 
('test_scheduler_process_execute_task_depends_on_past',\n  'dummy2',\n  
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n  1)]
E         At index 0 diff: 
('test_scheduler_process_execute_task_depends_on_past', 'dummy2', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, 
STD]>), 1) != ('test_scheduler_process_execute_task_depends_on_past', 'dummy1', 
datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
E         Full diff:
E           [
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy2',
E         ?         ^
E         +   'dummy1',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E            ('test_scheduler_process_execute_task_depends_on_past',
E         -   'dummy1',
E         ?         ^
E         +   'dummy2',
E         ?         ^
E         -   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, 
GMT, +00:00:00, STD]>),
E         ?                                                       ----     
---------------------
E         +   datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
E             1),
E           ]

tests/jobs/test_scheduler_job.py:565: AssertionError
======================================================== warnings summary 
========================================================
/usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
  /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19: 
DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and will 
be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import 
ImmutableDict' instead.
    from werkzeug import ImmutableDict

/usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
  /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5: 
DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will be 
removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode' instead.
    from werkzeug import url_encode

-- Docs: https://docs.pytest.org/en/latest/warnings.html
==================================================== short test summary info 
=====================================================
XFAIL 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
  This test is failing!
XPASS 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
 The test is flaky with nondeterministic result
FAILED 
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
FAILED 
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
FAILED 
tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
=========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2 
warnings in 61.44s (0:01:01) ============================

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to