This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b5d4e319b Render list items in rendered fields view (#32042)
2b5d4e319b is described below
commit 2b5d4e319b82267349aa20cb3230f3b2ad35adce
Author: Clemens <[email protected]>
AuthorDate: Tue Jul 4 05:47:27 2023 +0800
Render list items in rendered fields view (#32042)
---------
Co-authored-by: clemens.valiente <[email protected]>
---
airflow/www/utils.py | 30 ++++++++++---
airflow/www/views.py | 6 +--
tests/www/views/test_views_rendered.py | 78 +++++++++++++++++++++++++++++++++-
3 files changed, 100 insertions(+), 14 deletions(-)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 46256ee335..b31f9326d9 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import json
import textwrap
import time
-from typing import TYPE_CHECKING, Any, Sequence
+from typing import TYPE_CHECKING, Any, Callable, Sequence
from urllib.parse import urlencode
from flask import request, url_for
@@ -36,6 +36,7 @@ from markupsafe import Markup
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
+from pygments.lexer import Lexer
from sqlalchemy import delete, func, types
from sqlalchemy.ext.associationproxy import AssociationProxy
from sqlalchemy.sql import Select
@@ -549,20 +550,35 @@ def pygment_html_render(s, lexer=lexers.TextLexer):
return highlight(s, lexer(), HtmlFormatter(linenos=True))
-def render(obj, lexer):
+def render(obj: Any, lexer: Lexer, handler: Callable[[Any], str] | None =
None):
"""Render a given Python object with a given Pygments lexer."""
- out = ""
if isinstance(obj, str):
- out = Markup(pygment_html_render(obj, lexer))
+ return Markup(pygment_html_render(obj, lexer))
+
elif isinstance(obj, (tuple, list)):
+ out = ""
for i, text_to_render in enumerate(obj):
+ if lexer is lexers.PythonLexer:
+ text_to_render = repr(text_to_render)
out += Markup("<div>List item #{}</div>").format(i)
out += Markup("<div>" + pygment_html_render(text_to_render, lexer)
+ "</div>")
+ return out
+
elif isinstance(obj, dict):
+ out = ""
for k, v in obj.items():
+ if lexer is lexers.PythonLexer:
+ v = repr(v)
out += Markup('<div>Dict item "{}"</div>').format(k)
out += Markup("<div>" + pygment_html_render(v, lexer) + "</div>")
- return out
+ return out
+
+ elif handler is not None and obj is not None:
+ return Markup(pygment_html_render(handler(obj), lexer))
+
+ else:
+ # Return empty string otherwise
+ return ""
def json_render(obj, lexer):
@@ -603,8 +619,8 @@ def get_attr_renderer():
"mysql": lambda x: render(x, lexers.MySqlLexer),
"postgresql": lambda x: render(x, lexers.PostgresLexer),
"powershell": lambda x: render(x, lexers.PowerShellLexer),
- "py": lambda x: render(get_python_source(x), lexers.PythonLexer),
- "python_callable": lambda x: render(get_python_source(x),
lexers.PythonLexer),
+ "py": lambda x: render(x, lexers.PythonLexer, get_python_source),
+ "python_callable": lambda x: render(x, lexers.PythonLexer,
get_python_source),
"rst": lambda x: render(x, lexers.RstLexer),
"sql": lambda x: render(x, lexers.SqlLexer),
"tsql": lambda x: render(x, lexers.TransactSqlLexer),
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d791eefc37..90d98a424c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1484,11 +1484,7 @@ class Airflow(AirflowBaseView):
content = getattr(task, template_field)
renderer = task.template_fields_renderers.get(template_field,
template_field)
if renderer in renderers:
- if isinstance(content, (dict, list)):
- json_content = json.dumps(content, sort_keys=True,
indent=4)
- html_dict[template_field] =
renderers[renderer](json_content)
- else:
- html_dict[template_field] = renderers[renderer](content)
+ html_dict[template_field] = renderers[renderer](content)
else:
html_dict[template_field] =
Markup("<pre><code>{}</pre></code>").format(pformat(content))
diff --git a/tests/www/views/test_views_rendered.py
b/tests/www/views/test_views_rendered.py
index 6a2cb9717f..1557a083d5 100644
--- a/tests/www/views/test_views_rendered.py
+++ b/tests/www/views/test_views_rendered.py
@@ -24,7 +24,9 @@ import pytest
from markupsafe import escape
from airflow.models import DAG, RenderedTaskInstanceFields, Variable
+from airflow.models.baseoperator import BaseOperator
from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.session import create_session
@@ -64,6 +66,39 @@ def task2(dag):
)
[email protected]()
+def task3(dag):
+ class TestOperator(BaseOperator):
+ template_fields = ("sql",)
+
+ def __init__(self, *, sql, **kwargs):
+ super().__init__(**kwargs)
+ self.sql = sql
+
+ def execute(self, context):
+ pass
+
+ return TestOperator(
+ task_id="task3",
+ sql=["SELECT 1;", "SELECT 2;"],
+ dag=dag,
+ )
+
+
[email protected]()
+def task4(dag):
+ def func(*op_args):
+ pass
+
+ return PythonOperator(
+ task_id="task4",
+ python_callable=func,
+ op_args=["{{ task_instance_key_str }}_args"],
+ op_kwargs={"0": "{{ task_instance_key_str }}_kwargs"},
+ dag=dag,
+ )
+
+
@pytest.fixture()
def task_secret(dag):
return BashOperator(
@@ -85,7 +120,7 @@ def init_blank_db():
@pytest.fixture(autouse=True)
-def reset_db(dag, task1, task2, task_secret):
+def reset_db(dag, task1, task2, task3, task4, task_secret):
yield
clear_db_dags()
clear_db_runs()
@@ -93,7 +128,7 @@ def reset_db(dag, task1, task2, task_secret):
@pytest.fixture()
-def create_dag_run(dag, task1, task2, task_secret):
+def create_dag_run(dag, task1, task2, task3, task4, task_secret):
def _create_dag_run(*, execution_date, session):
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
@@ -108,6 +143,10 @@ def create_dag_run(dag, task1, task2, task_secret):
ti2.state = TaskInstanceState.SCHEDULED
ti3 = dag_run.get_task_instance(task_secret.task_id, session=session)
ti3.state = TaskInstanceState.QUEUED
+ ti4 = dag_run.get_task_instance(task3.task_id, session=session)
+ ti4.state = TaskInstanceState.SUCCESS
+ ti5 = dag_run.get_task_instance(task4.task_id, session=session)
+ ti5.state = TaskInstanceState.SUCCESS
session.flush()
return dag_run
@@ -290,3 +329,38 @@ def test_rendered_task_detail_env_secret(patch_app,
admin_client, request, env,
if request.node.callspec.id.endswith("-tpld-var"):
Variable.delete("plain_var")
Variable.delete("secret_var")
+
+
[email protected]("patch_app")
+def test_rendered_template_view_for_list_template_field_args(admin_client,
create_dag_run, task3):
+ """
+ Test that the Rendered View can show a list of syntax-highlighted SQL
statements
+ """
+ assert task3.sql == ["SELECT 1;", "SELECT 2;"]
+
+ with create_session() as session:
+ create_dag_run(execution_date=DEFAULT_DATE, session=session)
+
+ url =
f"rendered-templates?task_id=task3&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"
+
+ resp = admin_client.get(url, follow_redirects=True)
+ check_content_in_response("List item #0", resp)
+ check_content_in_response("List item #1", resp)
+
+
[email protected]("patch_app")
+def test_rendered_template_view_for_op_args(admin_client, create_dag_run,
task4):
+ """
+ Test that the Rendered View can show rendered values in op_args and
op_kwargs
+ """
+ assert task4.op_args == ["{{ task_instance_key_str }}_args"]
+ assert list(task4.op_kwargs.values()) == ["{{ task_instance_key_str
}}_kwargs"]
+
+ with create_session() as session:
+ create_dag_run(execution_date=DEFAULT_DATE, session=session)
+
+ url =
f"rendered-templates?task_id=task4&dag_id=testdag&execution_date={quote_plus(str(DEFAULT_DATE))}"
+
+ resp = admin_client.get(url, follow_redirects=True)
+ check_content_in_response("testdag__task4__20200301_args", resp)
+ check_content_in_response("testdag__task4__20200301_kwargs", resp)