[
https://issues.apache.org/jira/browse/AIRFLOW-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040604#comment-17040604
]
ASF GitHub Bot commented on AIRFLOW-6834:
-----------------------------------------
yuqian90 commented on pull request #7470: [AIRFLOW-6834] Fix flaky
test_scheduler_job by sorting TaskInstance
URL: https://github.com/apache/airflow/pull/7470
Fix a flaky test that fails because `TaskInstance` are returned in
non-deterministic order:
```
test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
```
---
Issue link: WILL BE INSERTED BY
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
Make sure to mark the boxes below before creating PR: [x]
- [x] Description above provides context of the change
- [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN =
JIRA ID<sup>*</sup>
- [x] Unit tests coverage for changes (not needed for documentation changes)
- [x] Commits follow "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)"
- [x] Relevant documentation is updated including usage instructions.
- [x] I will engage committers as explained in [Contribution Workflow
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
<sup>*</sup> For document-only changes commit message can start with
`[AIRFLOW-XXXX]`.
---
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
Read the [Pull Request
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
for more information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Fix flaky test
> test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6834
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6834
> Project: Apache Airflow
> Issue Type: Bug
> Components: tests
> Affects Versions: 1.10.9
> Reporter: Qian Yu
> Priority: Major
>
> test_scheduler_job.py has a few flaky tests. Some are marked with
> pytest.mark.xfail, but this one is not marked flaky. It sometimes fails in
> Travis. For example:
>
> {code:python}
> ============================================================ FAILURES
> ============================================================
> _____________________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_0
> ______________________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_0>
> state = None, start_date = None, end_date = None
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> _______________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> _______________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry>
> state = 'up_for_retry', start_date = datetime.datetime(2020, 2, 18, 13, 26,
> 42, 916304, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916315,
> tzinfo=<Timezone [UTC]>)
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ____________
> TestDagFileProcessor.test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> _____________
> a = (<tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>,)
> @wraps(func)
> def standalone_func(*a):
> > return func(*(a + p.args), **p.kwargs)
> /usr/local/lib/python3.6/site-packages/parameterized/parameterized.py:518:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> self = <tests.jobs.test_scheduler_job.TestDagFileProcessor
> testMethod=test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule>
> state = 'up_for_reschedule', start_date = datetime.datetime(2020, 2, 18, 13,
> 26, 42, 916318, tzinfo=<Timezone [UTC]>)
> end_date = datetime.datetime(2020, 2, 18, 13, 41, 42, 916321,
> tzinfo=<Timezone [UTC]>)
> @parameterized.expand([
> [State.NONE, None, None],
> [State.UP_FOR_RETRY, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> [State.UP_FOR_RESCHEDULE, timezone.utcnow() -
> datetime.timedelta(minutes=30),
> timezone.utcnow() - datetime.timedelta(minutes=15)],
> ])
> def test_dag_file_processor_process_task_instances_depends_on_past(self,
> state, start_date, end_date):
> """
> Test if _process_task_instances puts the right task instances into the
> mock_list.
> """
> dag = DAG(
> dag_id='test_scheduler_process_execute_task_depends_on_past',
> start_date=DEFAULT_DATE,
> default_args={
> 'depends_on_past': True,
> },
> )
> dag_task1 = DummyOperator(
> task_id='dummy1',
> dag=dag,
> owner='airflow')
> dag_task2 = DummyOperator(
> task_id='dummy2',
> dag=dag,
> owner='airflow')
> with create_session() as session:
> orm_dag = DagModel(dag_id=dag.dag_id)
> session.merge(orm_dag)
> dag_file_processor = DagFileProcessor(dag_ids=[],
> log=mock.MagicMock())
> dag.clear()
> dr = dag_file_processor.create_dag_run(dag)
> self.assertIsNotNone(dr)
> with create_session() as session:
> tis = dr.get_task_instances(session=session)
> for ti in tis:
> ti.state = state
> ti.start_date = start_date
> ti.end_date = end_date
> ti_to_schedule = []
> dag_file_processor._process_task_instances(dag,
> task_instances_list=ti_to_schedule)
> > assert ti_to_schedule == [
> (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER),
> (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER),
> ]
> E AssertionError: assert
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>),\n 1)] ==
> [('test_scheduler_process_execute_task_depends_on_past',\n 'dummy1',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1),\n
> ('test_scheduler_process_execute_task_depends_on_past',\n 'dummy2',\n
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),\n 1)]
> E At index 0 diff:
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy2',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC, GMT,
> +00:00:00, STD]>), 1) !=
> ('test_scheduler_process_execute_task_depends_on_past', 'dummy1',
> datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>), 1)
> E Full diff:
> E [
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy2',
> E ? ^
> E + 'dummy1',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ('test_scheduler_process_execute_task_depends_on_past',
> E - 'dummy1',
> E ? ^
> E + 'dummy2',
> E ? ^
> E - datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<TimezoneInfo [UTC,
> GMT, +00:00:00, STD]>),
> E ? ----
> ---------------------
> E + datetime.datetime(2016, 1, 1, 0, 0, tzinfo=<Timezone [UTC]>),
> E 1),
> E ]
> tests/jobs/test_scheduler_job.py:565: AssertionError
> ======================================================== warnings summary
> ========================================================
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19
> /usr/local/lib/python3.6/site-packages/flask_babel/__init__.py:19:
> DeprecationWarning: The import 'werkzeug.ImmutableDict' is deprecated and
> will be removed in Werkzeug 1.0. Use 'from werkzeug.datastructures import
> ImmutableDict' instead.
> from werkzeug import ImmutableDict
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5
> /usr/local/lib/python3.6/site-packages/flask_wtf/recaptcha/widgets.py:5:
> DeprecationWarning: The import 'werkzeug.url_encode' is deprecated and will
> be removed in Werkzeug 1.0. Use 'from werkzeug.urls import url_encode'
> instead.
> from werkzeug import url_encode
> -- Docs: https://docs.pytest.org/en/latest/warnings.html
> ==================================================== short test summary info
> =====================================================
> XFAIL
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_retry_handling_job
> This test is failing!
> XPASS
> tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_change_state_for_tis_without_dagrun
> The test is flaky with nondeterministic result
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_0
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_1_up_for_retry
> FAILED
> tests/jobs/test_scheduler_job.py::TestDagFileProcessor::test_dag_file_processor_process_task_instances_depends_on_past_2_up_for_reschedule
> =========================== 3 failed, 86 passed, 1 xfailed, 1 xpassed, 2
> warnings in 61.44s (0:01:01) ============================
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)