Repository: incubator-airflow Updated Branches: refs/heads/master 9b661fa61 -> 7c3435442
[AIRFLOW-2402] Fix RBAC task log Closes #3319 from yrqls21/kevin_yang_fix_rbac_view Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c343544 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c343544 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c343544 Branch: refs/heads/master Commit: 7c34354427f8c047b2cfb3f472a38bc50fe91d45 Parents: 9b661fa Author: Kevin Yang <[email protected]> Authored: Tue May 29 20:46:01 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Tue May 29 20:46:01 2018 +0100 ---------------------------------------------------------------------- airflow/configuration.py | 4 +- airflow/www/templates/airflow/ti_log.html | 1 - airflow/www/views.py | 11 +- airflow/www_rbac/templates/airflow/ti_log.html | 158 +++++++++++++++---- airflow/www_rbac/views.py | 92 ++++++++--- .../2017-09-01T00.00.00+00.00/1.log | 1 + .../2017-09-01T00.00.00/1.log | 1 - tests/www/test_views.py | 21 ++- .../2017-09-01T00.00.00+00.00/1.log | 1 + .../2017-09-01T00.00.00/1.log | 1 - tests/www_rbac/test_views.py | 117 +++++++++++++- 11 files changed, 337 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index e19a8b1..2ee453f 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -346,7 +346,7 @@ class AirflowConfigParser(ConfigParser): Note: this is not reversible. """ # override any custom settings with defaults - self.defaults.read_string(parameterized_config(DEFAULT_CONFIG)) + self.read_string(parameterized_config(DEFAULT_CONFIG)) # then read test config self.read_string(parameterized_config(TEST_CONFIG)) # then read any "custom" test settings @@ -446,8 +446,10 @@ if not os.path.isfile(AIRFLOW_CONFIG): log.info("Reading the config from %s", AIRFLOW_CONFIG) conf = AirflowConfigParser(default_config=parameterized_config(DEFAULT_CONFIG)) + conf.read(AIRFLOW_CONFIG) + if conf.getboolean('webserver', 'rbac'): with open(os.path.join(_templates_dir, 'default_webserver_config.py')) as f: DEFAULT_WEBSERVER_CONFIG = f.read() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/templates/airflow/ti_log.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html index 2615c45..2b50061 100644 --- a/airflow/www/templates/airflow/ti_log.html +++ b/airflow/www/templates/airflow/ti_log.html @@ -41,7 +41,6 @@ limitations under the License. </div> {% endblock %} {% block tail %} -{{ lib.form_js() }} {{ super() }} <script> // TODO: make those constants configurable. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 4860dbd..eb29b89 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -733,10 +733,12 @@ class Airflow(BaseView): execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) try_number = int(request.args.get('try_number')) - # metadata may be None metadata = request.args.get('metadata') - if metadata: - metadata = json.loads(metadata) + metadata = json.loads(metadata) + + # metadata may be null + if not metadata: + metadata = {} # Convert string datetime into actual datetime try: @@ -779,9 +781,6 @@ class Airflow(BaseView): .format(task_log_reader, str(e))] metadata['end_of_log'] = True return jsonify(message=error_message, error=True, metadata=metadata) - except AirflowException as e: - metadata['end_of_log'] = True - return jsonify(message=str(e), error=True, metadata=metadata) @expose('/log') @login_required http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/templates/airflow/ti_log.html ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/templates/airflow/ti_log.html b/airflow/www_rbac/templates/airflow/ti_log.html index 79aee89..c873f67 100644 --- a/airflow/www_rbac/templates/airflow/ti_log.html +++ b/airflow/www_rbac/templates/airflow/ti_log.html @@ -1,40 +1,140 @@ {# - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. #} {% extends "airflow/task_instance.html" %} {% block title %}Airflow - DAGs{% endblock %} {% block content %} - {{ super() }} - <h4>{{ title }}</h4> - <ul class="nav nav-pills" role="tablist"> - {% for log in logs %} - <li role="presentation" class="{{ 'active' if loop.last else '' }}"> - <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab"> - {{ loop.index }} - </a> - </li> - {% endfor %} - </ul> - <div class="tab-content"> - {% for log in logs %} - <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}"> - <pre id="attempt-{{ loop.index }}">{{ log }}</pre> - </div> - {% endfor %} +{{ super() }} +<h4>{{ title }}</h4> +<ul class="nav nav-pills" role="tablist"> + {% for log in logs %} + <li role="presentation" class="{{ 'active' if loop.last else '' }}"> + <a href="#{{ loop.index }}" aria-controls="{{ loop.index }}" role="tab" data-toggle="tab"> + {{ loop.index }} + </a> + </li> + {% endfor %} +</ul> +<div class="tab-content"> + {% for log in logs %} + <div role="tabpanel" class="tab-pane {{ 'active' if loop.last else '' }}" id="{{ loop.index }}"> + <img id="loading-{{ loop.index }}" style="margin-top:0%; margin-left:50%; height:50px; width:50px; position: absolute;" + alt="spinner" src="{{ url_for('static', filename='loading.gif') }}"> + <pre><code id="try-{{ loop.index }}">{{ log }}</code></pre> </div> + {% endfor %} + </div> +{% endblock %} +{% block tail %} +{{ super() }} +<script> + // TODO: make those constants configurable. + // Time interval to wait before next log fetching. Default 2s. + const DELAY = 2e3; + // Distance away from page bottom to enable auto tailing. + const AUTO_TAILING_OFFSET = 30; + // Animation speed for auto tailing log display. + const ANIMATION_SPEED = 1000; + // Total number of tabs to show. + const TOTAL_ATTEMPTS = "{{ logs|length }}"; + + // Recursively fetch logs from flask endpoint. + function recurse(delay=DELAY) { + return new Promise((resolve) => setTimeout(resolve, delay)); + } + + // Enable auto tailing only when users scroll down to the bottom + // of the page. This prevent auto tailing the page if users want + // to view earlier rendered messages. + function checkAutoTailingCondition() { + const docHeight = $(document).height(); + console.debug($(window).scrollTop()) + console.debug($(window).height()) + console.debug($(document).height()) + return $(window).scrollTop() != 0 + && ($(window).scrollTop() + $(window).height() > docHeight - AUTO_TAILING_OFFSET); + } + + // Streaming log with auto-tailing. + function autoTailingLog(try_number, metadata=null, auto_tailing=false) { + console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ task_id }}, \ + execution_date: {{ execution_date }}, try_number: " + try_number + ", metadata: " + JSON.stringify(metadata)); + + return Promise.resolve( + $.ajax({ + url: "{{ url_for("Airflow.get_logs_with_metadata") }}", + data: { + dag_id: "{{ dag_id }}", + task_id: "{{ task_id }}", + execution_date: "{{ execution_date }}", + try_number: try_number, + metadata: JSON.stringify(metadata), + }, + })).then(res => { + // Stop recursive call to backend when error occurs. + if (!res) { + document.getElementById("loading-"+try_number).style.display = "none"; + return; + } + // res.error is a boolean + // res.message is the log itself or the error message + if (res.error) { + if (res.message) { + console.error("Error while retrieving log: " + res.message); + } + document.getElementById("loading-"+try_number).style.display = "none"; + return; + } + + if (res.message) { + // Auto scroll window to the end if current window location is near the end. + if(auto_tailing && checkAutoTailingCondition()) { + var should_scroll = true + } + // The message may contain HTML, so either have to escape it or write it as text. + document.getElementById(`try-${try_number}`).textContent += res.message + "\n"; + // Auto scroll window to the end if current window location is near the end. + if(should_scroll) { + $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED); + } + } + + if (res.metadata.end_of_log) { + document.getElementById("loading-"+try_number).style.display = "none"; + return; + } + return recurse().then(() => autoTailingLog( + try_number, res.metadata, auto_tailing)); + }); + } + $(document).ready(function() { + // Lazily load all past task instance logs. + // TODO: We only need to have recursive queries for + // latest running task instances. Currently it does not + // work well with ElasticSearch because ES query only + // returns at most 10k documents. We want the ability + // to display all logs in the front-end. + // An optimization here is to render from latest attempt. + for(let i = TOTAL_ATTEMPTS; i >= 1; i--) { + // Only auto_tailing the page when streaming the latest attempt. + autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS)); + } + }); + +</script> {% endblock %} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/airflow/www_rbac/views.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index f508720..ebd1005 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -38,7 +38,7 @@ from sqlalchemy import or_, desc, and_, union_all from flask import ( g, redirect, request, Markup, Response, render_template, - make_response, flash) + make_response, flash, jsonify) from flask._compat import PY2 from flask_appbuilder import BaseView, ModelView, expose, has_access @@ -417,6 +417,65 @@ class Airflow(AirflowBaseView): form=form, title=title, ) + @expose('/get_logs_with_metadata') + @has_access + @action_logging + @provide_session + def get_logs_with_metadata(self, session=None): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + dttm = pendulum.parse(execution_date) + try_number = int(request.args.get('try_number')) + metadata = request.args.get('metadata') + metadata = json.loads(metadata) + + # metadata may be null + if not metadata: + metadata = {} + + # Convert string datetime into actual datetime + try: + execution_date = timezone.parse(execution_date) + except ValueError: + error_message = ( + 'Given execution date, {}, could not be identified ' + 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( + execution_date)) + response = jsonify({'error': error_message}) + response.status_code = 400 + + return response + + logger = logging.getLogger('airflow.task') + task_log_reader = conf.get('core', 'task_log_reader') + handler = next((handler for handler in logger.handlers + if handler.name == task_log_reader), None) + + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, + models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() + try: + if ti is None: + logs = ["*** Task instance did not exist in the DB\n"] + metadata['end_of_log'] = True + else: + dag = dagbag.get_dag(dag_id) + ti.task = dag.get_task(ti.task_id) + logs, metadatas = handler.read(ti, try_number, metadata=metadata) + metadata = metadatas[0] + for i, log in enumerate(logs): + if PY2 and not isinstance(log, unicode): + logs[i] = log.decode('utf-8') + message = logs[0] + return jsonify(message=message, metadata=metadata) + except AttributeError as e: + error_message = ["Task log handler {} does not support read logs.\n{}\n" + .format(task_log_reader, str(e))] + metadata['end_of_log'] = True + return jsonify(message=error_message, error=True, metadata=metadata) + @expose('/log') @has_access @action_logging @@ -428,34 +487,17 @@ class Airflow(AirflowBaseView): dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) - ti = ( - session.query(models.TaskInstance) - .filter(models.TaskInstance.dag_id == dag_id, - models.TaskInstance.task_id == task_id, - models.TaskInstance.execution_date == dttm) - .first() - ) - if ti is None: - logs = ["*** Task instance did not exist in the DB\n"] - else: - logger = logging.getLogger('airflow.task') - task_log_reader = conf.get('core', 'task_log_reader') - handler = next((handler for handler in logger.handlers - if handler.name == task_log_reader), None) - try: - ti.task = dag.get_task(ti.task_id) - logs = handler.read(ti) - except AttributeError as e: - logs = ["Task log handler {} does not support read logs.\n{}\n" - .format(task_log_reader, str(e))] - for i, log in enumerate(logs): - if PY2 and not isinstance(log, unicode): - logs[i] = log.decode('utf-8') + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, + models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() + logs = [''] * (ti.next_try_number - 1 if ti is not None else 0) return self.render( 'airflow/ti_log.html', - logs=logs, dag=dag, title="Log by attempts", task_id=task_id, + logs=logs, dag=dag, title="Log by attempts", + dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date, form=form) @expose('/task') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log ---------------------------------------------------------------------- diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log new file mode 100644 index 0000000..bc10ef7 --- /dev/null +++ b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log @@ -0,0 +1 @@ +Log for testing. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log ---------------------------------------------------------------------- diff --git a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log deleted file mode 100644 index bc10ef7..0000000 --- a/tests/www/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log +++ /dev/null @@ -1 +0,0 @@ -Log for testing. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 3b2892d..08d72be 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -364,7 +364,6 @@ class TestLogView(unittest.TestCase): def tearDown(self): logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) - dagbag = models.DagBag(settings.DAGS_FOLDER) self.session.query(TaskInstance).filter( TaskInstance.dag_id == self.DAG_ID and TaskInstance.task_id == self.TASK_ID and @@ -400,6 +399,22 @@ class TestLogView(unittest.TestCase): self.assertIn('"message":', response.data.decode('utf-8')) self.assertIn('"metadata":', response.data.decode('utf-8')) + self.assertIn('Log for testing.', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + + def test_get_logs_with_null_metadata(self): + url_template = "/admin/airflow/get_logs_with_metadata?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}&metadata=null" + response = \ + self.app.get(url_template.format(self.DAG_ID, + self.TASK_ID, + quote_plus(self.DEFAULT_DATE.isoformat()), + 1)) + + self.assertIn('"message":', response.data.decode('utf-8')) + self.assertIn('"metadata":', response.data.decode('utf-8')) + self.assertIn('Log for testing.', response.data.decode('utf-8')) self.assertEqual(200, response.status_code) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log new file mode 100644 index 0000000..bc10ef7 --- /dev/null +++ b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00+00.00/1.log @@ -0,0 +1 @@ +Log for testing. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log b/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log deleted file mode 100644 index bc10ef7..0000000 --- a/tests/www_rbac/test_logs/dag_for_testing_log_view/task_for_testing_log_view/2017-09-01T00.00.00/1.log +++ /dev/null @@ -1 +0,0 @@ -Log for testing. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c343544/tests/www_rbac/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index bbec4ac..f6fb24b 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,14 +17,27 @@ # specific language governing permissions and limitations # under the License. +import copy import io +import json +import logging.config +import os +import shutil +import sys +import tempfile import unittest import urllib -from werkzeug.test import Client + from flask._compat import PY2 from flask_appbuilder.security.sqla.models import User as ab_user -from airflow import models +from urllib.parse import quote_plus +from werkzeug.test import Client + from airflow import configuration as conf +from airflow import models +from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.models import DAG, TaskInstance +from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session from airflow.utils import timezone from airflow.utils.state import State @@ -400,6 +413,102 @@ class TestConfigurationView(TestBase): ['Airflow Configuration', 'Running Configuration'], resp) +class TestLogView(TestBase): + DAG_ID = 'dag_for_testing_log_view' + TASK_ID = 'task_for_testing_log_view' + DEFAULT_DATE = timezone.datetime(2017, 9, 1) + ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&' \ + 'execution_date={execution_date}'.format(dag_id=DAG_ID, + task_id=TASK_ID, + execution_date=DEFAULT_DATE) + + def setUp(self): + conf.load_test_config() + + # Create a custom logging configuration + logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) + current_dir = os.path.dirname(os.path.abspath(__file__)) + logging_config['handlers']['task']['base_log_folder'] = os.path.normpath( + os.path.join(current_dir, 'test_logs')) + logging_config['handlers']['task']['filename_template'] = \ + '{{ ti.dag_id }}/{{ ti.task_id }}/' \ + '{{ ts | replace(":", ".") }}/{{ try_number }}.log' + + # Write the custom logging configuration to a file + self.settings_folder = tempfile.mkdtemp() + settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py") + new_logging_file = "LOGGING_CONFIG = {}".format(logging_config) + with open(settings_file, 'w') as handle: + handle.writelines(new_logging_file) + sys.path.append(self.settings_folder) + conf.set('core', 'logging_config_class', 'airflow_local_settings.LOGGING_CONFIG') + + self.app, self.appbuilder = application.create_app(testing=True) + self.app.config['WTF_CSRF_ENABLED'] = False + self.client = self.app.test_client() + self.login() + self.session = Session() + + from airflow.www_rbac.views import dagbag + dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE) + task = DummyOperator(task_id=self.TASK_ID, dag=dag) + dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag) + ti = TaskInstance(task=task, execution_date=self.DEFAULT_DATE) + ti.try_number = 1 + self.session.merge(ti) + self.session.commit() + + def tearDown(self): + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + self.clear_table(TaskInstance) + + shutil.rmtree(self.settings_folder) + conf.set('core', 'logging_config_class', '') + + self.logout() + super(TestLogView, self).tearDown() + + def test_get_file_task_log(self): + response = self.client.get( + TestLogView.ENDPOINT, + follow_redirects=True, + ) + self.assertEqual(response.status_code, 200) + self.assertIn('Log by attempts', + response.data.decode('utf-8')) + + def test_get_logs_with_metadata(self): + url_template = "get_logs_with_metadata?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}&metadata={}" + response = \ + self.client.get(url_template.format(self.DAG_ID, + self.TASK_ID, + quote_plus(self.DEFAULT_DATE.isoformat()), + 1, + json.dumps({})), follow_redirects=True) + + self.assertIn('"message":', response.data.decode('utf-8')) + self.assertIn('"metadata":', response.data.decode('utf-8')) + self.assertIn('Log for testing.', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + + def test_get_logs_with_null_metadata(self): + url_template = "get_logs_with_metadata?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}&metadata=null" + response = \ + self.client.get(url_template.format(self.DAG_ID, + self.TASK_ID, + quote_plus(self.DEFAULT_DATE.isoformat()), + 1), follow_redirects=True) + + self.assertIn('"message":', response.data.decode('utf-8')) + self.assertIn('"metadata":', response.data.decode('utf-8')) + self.assertIn('Log for testing.', response.data.decode('utf-8')) + self.assertEqual(200, response.status_code) + + class TestVersionView(TestBase): def test_version(self): resp = self.client.get('version', follow_redirects=True)
