Repository: incubator-airflow Updated Branches: refs/heads/master 4c6f1fdd6 -> 30b2b633b
[AIRFLOW-2529] Improve graph view performance and usability Limit number of dag runs shown in drop down. Add base date and number of runs widgets known from other views which allows kind of paging through all dag runs. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5f79465b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5f79465b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5f79465b Branch: refs/heads/master Commit: 5f79465bd8be8b18cce85a8d40a48fdc6b8a81cf Parents: 3ed25a9 Author: Stefan Seelmann <[email protected]> Authored: Mon May 28 21:17:36 2018 +0200 Committer: Stefan Seelmann <[email protected]> Committed: Wed May 30 23:11:53 2018 +0200 ---------------------------------------------------------------------- airflow/www/templates/airflow/graph.html | 4 +- airflow/www/views.py | 33 +++- airflow/www_rbac/templates/airflow/graph.html | 4 +- airflow/www_rbac/views.py | 33 +++- tests/www/test_views.py | 176 ++++++++++++++++++++- tests/www_rbac/test_views.py | 171 +++++++++++++++++++- 6 files changed, 408 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/airflow/www/templates/airflow/graph.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/graph.html b/airflow/www/templates/airflow/graph.html index e17c89b..33cdd97 100644 --- a/airflow/www/templates/airflow/graph.html +++ b/airflow/www/templates/airflow/graph.html @@ -37,6 +37,8 @@ <div class="form-inline"> <form method="get" style="float:left;"> {{ state_token }} + Base date: {{ form.base_date(class_="form-control") }} + Number of runs: {{ form.num_runs(class_="form-control") }} Run: {{ form.execution_date(class_="form-control") | safe }} Layout: @@ -50,7 +52,7 @@ <div class="input-group" style="float: right;"> <input type="text" id="searchbox" class="form-control" placeholder="Search for..." onenter="null"> </div><!-- /input-group --> - <div style="clear: both;"> + <div style="clear: both;"/> </div> <hr/> <div> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index eb29b89..284f7e9 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1356,6 +1356,7 @@ class Airflow(BaseView): @wwwutils.action_logging @provide_session def graph(self, session=None): + default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) @@ -1403,11 +1404,26 @@ class Airflow(BaseView): else: dttm = dag.latest_execution_date or timezone.utcnow() + base_date = request.args.get('base_date') + if base_date: + base_date = timezone.parse(base_date) + else: + # The DateTimeField widget truncates milliseconds and would loose + # the first dag run. Round to next second. + base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0) + + num_runs = request.args.get('num_runs') + num_runs = int(num_runs) if num_runs else default_dag_run + DR = models.DagRun drs = ( session.query(DR) - .filter_by(dag_id=dag_id) - .order_by(desc(DR.execution_date)).all() + .filter( + DR.dag_id == dag.dag_id, + DR.execution_date <= base_date) + .order_by(desc(DR.execution_date)) + .limit(num_runs) + .all() ) dr_choices = [] dr_state = None @@ -1416,7 +1432,13 @@ class Airflow(BaseView): if dttm == dr.execution_date: dr_state = dr.state - class GraphForm(Form): + # Happens if base_date was changed and the selected dag run is not in result + if not dr_state and drs: + dr = drs[0] + dttm = dr.execution_date + dr_state = dr.state + + class GraphForm(DateTimeWithNumRunsForm): execution_date = SelectField("DAG run", choices=dr_choices) arrange = SelectField("Layout", choices=( ('LR', "Left->Right"), @@ -1426,7 +1448,10 @@ class Airflow(BaseView): )) form = GraphForm( - data={'execution_date': dttm.isoformat(), 'arrange': arrange}) + data={'execution_date': dttm.isoformat(), + 'arrange': arrange, + 'base_date': base_date, + 'num_runs': num_runs}) task_instances = { ti.task_id: alchemy_to_dict(ti) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/airflow/www_rbac/templates/airflow/graph.html ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/templates/airflow/graph.html b/airflow/www_rbac/templates/airflow/graph.html index 061cad6..35f79b6 100644 --- a/airflow/www_rbac/templates/airflow/graph.html +++ b/airflow/www_rbac/templates/airflow/graph.html @@ -35,6 +35,8 @@ <div class="form-inline"> <form method="get" style="float:left;"> {{ state_token }} + Base date: {{ form.base_date(class_="form-control") }} + Number of runs: {{ form.num_runs(class_="form-control") }} Run: {{ form.execution_date(class_="form-control") | safe }} Layout: @@ -48,7 +50,7 @@ <div class="input-group" style="float: right;"> <input type="text" id="searchbox" class="form-control" placeholder="Search for..." onenter="null"> </div><!-- /input-group --> - <div style="clear: both;"> + <div style="clear: both;"/> </div> <hr/> <div> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/airflow/www_rbac/views.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index ebd1005..6f3d650 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -1031,6 +1031,7 @@ class Airflow(AirflowBaseView): @action_logging @provide_session def graph(self, session=None): + default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) @@ -1078,11 +1079,26 @@ class Airflow(AirflowBaseView): else: dttm = dag.latest_execution_date or timezone.utcnow() + base_date = request.args.get('base_date') + if base_date: + base_date = timezone.parse(base_date) + else: + # The DateTimeField widget truncates milliseconds and would loose + # the first dag run. Round to next second. + base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0) + + num_runs = request.args.get('num_runs') + num_runs = int(num_runs) if num_runs else default_dag_run + DR = models.DagRun drs = ( session.query(DR) - .filter_by(dag_id=dag_id) - .order_by(desc(DR.execution_date)).all() + .filter( + DR.dag_id == dag.dag_id, + DR.execution_date <= base_date) + .order_by(desc(DR.execution_date)) + .limit(num_runs) + .all() ) dr_choices = [] dr_state = None @@ -1091,7 +1107,13 @@ class Airflow(AirflowBaseView): if dttm == dr.execution_date: dr_state = dr.state - class GraphForm(Form): + # Happens if base_date was changed and the selected dag run is not in result + if not dr_state and drs: + dr = drs[0] + dttm = dr.execution_date + dr_state = dr.state + + class GraphForm(DateTimeWithNumRunsForm): execution_date = SelectField("DAG run", choices=dr_choices) arrange = SelectField("Layout", choices=( ('LR', "Left->Right"), @@ -1101,7 +1123,10 @@ class Airflow(AirflowBaseView): )) form = GraphForm( - data={'execution_date': dttm.isoformat(), 'arrange': arrange}) + data={'execution_date': dttm.isoformat(), + 'arrange': arrange, + 'base_date': base_date, + 'num_runs': num_runs}) task_instances = { ti.task_id: alchemy_to_dict(ti) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/tests/www/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 08d72be..b115c7c 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -32,7 +32,7 @@ from werkzeug.test import Client from airflow import models, configuration, settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG -from airflow.models import DAG, TaskInstance +from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session from airflow.utils.timezone import datetime @@ -97,7 +97,6 @@ class TestChartModelView(unittest.TestCase): '/admin/chart?sort=3', follow_redirects=True, ) - print(response.data) self.assertEqual(response.status_code, 200) self.assertIn('Sort by Owner', response.data.decode('utf-8')) @@ -505,5 +504,178 @@ class TestMountPoint(unittest.TestCase): self.assertIn(b"DAGs", resp_html) +class TestGraphView(unittest.TestCase): + DAG_ID = 'dag_for_testing_graph_view' + DEFAULT_DATE = datetime(2017, 9, 1) + RUNS_DATA = [ + ('dag_run_for_testing_graph_view_4', datetime(2018, 4, 4)), + ('dag_run_for_testing_graph_view_3', datetime(2018, 3, 3)), + ('dag_run_for_testing_graph_view_2', datetime(2018, 2, 2)), + ('dag_run_for_testing_graph_view_1', datetime(2018, 1, 1)), + ] + GRAPH_ENDPOINT = '/admin/airflow/graph?dag_id={dag_id}'.format( + dag_id=DAG_ID + ) + + @classmethod + def setUpClass(cls): + super(TestGraphView, cls).setUpClass() + + def setUp(self): + super(TestGraphView, self).setUp() + configuration.load_test_config() + app = application.create_app(testing=True) + app.config['WTF_CSRF_METHODS'] = [] + self.app = app.test_client() + self.session = Session() + from airflow.www.views import dagbag + from airflow.utils.state import State + dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE) + dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag) + self.runs = [] + for rd in self.RUNS_DATA: + run = dag.create_dagrun( + run_id=rd[0], + execution_date=rd[1], + state=State.SUCCESS, + external_trigger=True + ) + self.runs.append(run) + + def tearDown(self): + self.session.query(DagRun).filter( + DagRun.dag_id == self.DAG_ID).delete() + self.session.commit() + self.session.close() + super(TestGraphView, self).tearDown() + + @classmethod + def tearDownClass(cls): + super(TestGraphView, cls).tearDownClass() + + def assertBaseDateAndNumRuns(self, base_date, num_runs, data): + self.assertNotIn('name="base_date" value="{}"'.format(base_date), data) + self.assertNotIn('<option selected="" value="{}">{}</option>'.format( + num_runs, num_runs), data) + + def assertRunIsNotInDropdown(self, run, data): + self.assertNotIn(run.execution_date.isoformat(), data) + self.assertNotIn(run.run_id, data) + + def assertRunIsInDropdownNotSelected(self, run, data): + self.assertIn('<option value="{}">{}</option>'.format( + run.execution_date.isoformat(), run.run_id), data) + + def assertRunIsSelected(self, run, data): + self.assertIn('<option selected value="{}">{}</option>'.format( + run.execution_date.isoformat(), run.run_id), data) + + def test_graph_view_default_parameters(self): + """ + Tests graph view with no URL parameter. + Should show all dag runs in the drop down. + Should select the latest dag run. + Should set base date to current date (not asserted) + """ + response = self.app.get( + self.GRAPH_ENDPOINT + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertIn('Base date:', data) + self.assertIn('Number of runs:', data) + self.assertRunIsSelected(self.runs[0], data) + self.assertRunIsInDropdownNotSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_execution_date_parameter_only(self): + """ + Tests graph view with execution_date URL parameter. + Scenario: click link from dag runs view. + Should only show dag runs older than execution_date in the drop down. + Should select the particular dag run. + Should set base date to execution date. + """ + response = self.app.get( + self.GRAPH_ENDPOINT + '&execution_date={}'.format( + self.runs[1].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns( + self.runs[1].execution_date, + configuration.getint('webserver', 'default_dag_run_display_number'), + data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_parmeters_only(self): + """ + Tests graph view with base_date and num_runs URL parameters. + Should only show dag runs older than base_date in the drop down, + limited to num_runs. + Should select the latest dag run. + Should set base date and num runs to submitted values. + """ + response = self.app.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=2'.format( + self.runs[1].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[1].execution_date, 2, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsNotInDropdown(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_and_execution_date_outside(self): + """ + Tests graph view with base_date and num_runs and execution-date URL parameters. + Scenario: change the base date and num runs and press "Go", + the selected execution date is outside the new range. + Should only show dag runs older than base_date in the drop down. + Should select the latest dag run within the range. + Should set base date and num runs to submitted values. + """ + response = self.app.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=42&execution_date={}'.format( + self.runs[1].execution_date.isoformat(), + self.runs[0].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[1].execution_date, 42, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_and_execution_date_within(self): + """ + Tests graph view with base_date and num_runs and execution-date URL parameters. + Scenario: change the base date and num runs and press "Go", + the selected execution date is within the new range. + Should only show dag runs older than base_date in the drop down. + Should select the dag run with the execution date. + Should set base date and num runs to submitted values. + """ + response = self.app.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=5&execution_date={}'.format( + self.runs[2].execution_date.isoformat(), + self.runs[3].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[2].execution_date, 5, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsNotInDropdown(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsSelected(self.runs[3], data) + + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5f79465b/tests/www_rbac/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index f6fb24b..cb81f62 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -36,11 +36,12 @@ from werkzeug.test import Client from airflow import configuration as conf from airflow import models from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG -from airflow.models import DAG, TaskInstance +from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session from airflow.utils import timezone from airflow.utils.state import State +from airflow.utils.timezone import datetime from airflow.www_rbac import app as application @@ -515,5 +516,173 @@ class TestVersionView(TestBase): self.check_content_in_response('Version Info', resp) +class TestGraphView(TestBase): + DAG_ID = 'dag_for_testing_graph_view' + DEFAULT_DATE = datetime(2017, 9, 1) + RUNS_DATA = [ + ('dag_run_for_testing_graph_view_4', datetime(2018, 4, 4)), + ('dag_run_for_testing_graph_view_3', datetime(2018, 3, 3)), + ('dag_run_for_testing_graph_view_2', datetime(2018, 2, 2)), + ('dag_run_for_testing_graph_view_1', datetime(2018, 1, 1)), + ] + GRAPH_ENDPOINT = '/graph?dag_id={dag_id}'.format( + dag_id=DAG_ID + ) + + @classmethod + def setUpClass(cls): + super(TestGraphView, cls).setUpClass() + + def setUp(self): + super(TestGraphView, self).setUp() + from airflow.www_rbac.views import dagbag + from airflow.utils.state import State + dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE) + dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag) + self.runs = [] + for rd in self.RUNS_DATA: + run = dag.create_dagrun( + run_id=rd[0], + execution_date=rd[1], + state=State.SUCCESS, + external_trigger=True + ) + self.runs.append(run) + + def tearDown(self): + self.session.query(DagRun).filter( + DagRun.dag_id == self.DAG_ID).delete() + self.session.commit() + self.session.close() + super(TestGraphView, self).tearDown() + + @classmethod + def tearDownClass(cls): + super(TestGraphView, cls).tearDownClass() + + def assertBaseDateAndNumRuns(self, base_date, num_runs, data): + self.assertNotIn('name="base_date" value="{}"'.format(base_date), data) + self.assertNotIn('<option selected="" value="{}">{}</option>'.format( + num_runs, num_runs), data) + + def assertRunIsNotInDropdown(self, run, data): + self.assertNotIn(run.execution_date.isoformat(), data) + self.assertNotIn(run.run_id, data) + + def assertRunIsInDropdownNotSelected(self, run, data): + self.assertIn('<option value="{}">{}</option>'.format( + run.execution_date.isoformat(), run.run_id), data) + + def assertRunIsSelected(self, run, data): + self.assertIn('<option selected value="{}">{}</option>'.format( + run.execution_date.isoformat(), run.run_id), data) + + def test_graph_view_default_parameters(self): + """ + Tests graph view with no URL parameter. + Should show all dag runs in the drop down. + Should select the latest dag run. + Should set base date to current date (not asserted) + """ + response = self.client.get( + self.GRAPH_ENDPOINT + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertIn('Base date:', data) + self.assertIn('Number of runs:', data) + self.assertRunIsSelected(self.runs[0], data) + self.assertRunIsInDropdownNotSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_execution_date_parameter_only(self): + """ + Tests graph view with execution_date URL parameter. + Scenario: click link from dag runs view. + Should only show dag runs older than execution_date in the drop down. + Should select the particular dag run. + Should set base date to execution date. + """ + response = self.client.get( + self.GRAPH_ENDPOINT + '&execution_date={}'.format( + self.runs[1].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns( + self.runs[1].execution_date, + conf.getint('webserver', 'default_dag_run_display_number'), + data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_parmeters_only(self): + """ + Tests graph view with base_date and num_runs URL parameters. + Should only show dag runs older than base_date in the drop down, + limited to num_runs. + Should select the latest dag run. + Should set base date and num runs to submitted values. + """ + response = self.client.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=2'.format( + self.runs[1].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[1].execution_date, 2, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsNotInDropdown(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_and_execution_date_outside(self): + """ + Tests graph view with base_date and num_runs and execution-date URL parameters. + Scenario: change the base date and num runs and press "Go", + the selected execution date is outside the new range. + Should only show dag runs older than base_date in the drop down. + Should select the latest dag run within the range. + Should set base date and num runs to submitted values. + """ + response = self.client.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=42&execution_date={}'.format( + self.runs[1].execution_date.isoformat(), + self.runs[0].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[1].execution_date, 42, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsSelected(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsInDropdownNotSelected(self.runs[3], data) + + def test_graph_view_with_base_date_and_num_runs_and_execution_date_within(self): + """ + Tests graph view with base_date and num_runs and execution-date URL parameters. + Scenario: change the base date and num runs and press "Go", + the selected execution date is within the new range. + Should only show dag runs older than base_date in the drop down. + Should select the dag run with the execution date. + Should set base date and num runs to submitted values. + """ + response = self.client.get( + self.GRAPH_ENDPOINT + '&base_date={}&num_runs=5&execution_date={}'.format( + self.runs[2].execution_date.isoformat(), + self.runs[3].execution_date.isoformat()) + ) + self.assertEqual(response.status_code, 200) + data = response.data.decode('utf-8') + self.assertBaseDateAndNumRuns(self.runs[2].execution_date, 5, data) + self.assertRunIsNotInDropdown(self.runs[0], data) + self.assertRunIsNotInDropdown(self.runs[1], data) + self.assertRunIsInDropdownNotSelected(self.runs[2], data) + self.assertRunIsSelected(self.runs[3], data) + + if __name__ == '__main__': unittest.main()
