This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 302efad Also track task_log_prefix_template changes (#20435)
302efad is described below
commit 302efad167a54f05642ff163c1319b40d2196970
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Dec 21 20:26:57 2021 +0800
Also track task_log_prefix_template changes (#20435)
---
UPDATING.md | 6 +-
...62e7089_add_task_log_filename_template_model.py | 15 +--
airflow/models/dagrun.py | 35 +++++--
airflow/models/tasklog.py | 12 +--
airflow/utils/cli.py | 4 +-
airflow/utils/db.py | 21 ++--
airflow/utils/helpers.py | 2 +-
.../log/task_handler_with_custom_formatter.py | 30 +++---
docs/apache-airflow/migrations-ref.rst | 3 +-
.../test_task_handler_with_custom_formatter.py | 114 ++++++++++++++-------
tests/www/views/test_views_log.py | 12 +--
11 files changed, 157 insertions(+), 97 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 36c5488..f076451 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -89,11 +89,11 @@ Smart sensors, an "early access" feature added in Airflow
2, are now deprecated
See [Migrating to Deferrable
Operators](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/smart-sensors.html#migrating-to-deferrable-operators)
for details on how to migrate.
-### Task log filenames are not rendered from database entry instead of config
value
+### Task log templates are now read from the metadatabase instead of
`airflow.cfg`
-Previously, filename of a task’s log is dynamically rendered from the ``[core]
log_filename_template`` config value at runtime. This resulted in unfortunate
characteristics like it is inpractical to modify the config value after an
Airflow instance is running for a while, since all existing task logs have be
saved under the previous format and cannot be found with the new config value.
+Previously, a task’s log is dynamically rendered from the `[core]
log_filename_template` and `[core] task_log_prefix_template` config values at
runtime. This resulted in unfortunate characteristics, e.g. it is impractical
to modify the config value after an Airflow instance is running for a while,
since all existing task logs have be saved under the previous format and cannot
be found with the new config value.
-A new `log_filename` table is introduced to solve this problem. This table is
synchronised with the aforementioned config value every time Airflow starts,
and a new field `log_filename_id` is added to every DAG run to point to the
format used by tasks (`NULL` indicates the first ever entry for compatibility).
+A new `log_template` table is introduced to solve this problem. This table is
synchronised with the aforementioned config values every time Airflow starts,
and a new field `log_template_id` is added to every DAG run to point to the
format used by tasks (`NULL` indicates the first ever entry for compatibility).
## Airflow 2.2.2
diff --git
a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
index 21d5bd3..1ed339d 100644
---
a/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
+++
b/airflow/migrations/versions/f9da662e7089_add_task_log_filename_template_model.py
@@ -36,17 +36,18 @@ depends_on = None
def upgrade():
- """Add model for task log filename template and establish fk on task
instance."""
+ """Add model for task log template and establish fk on task instance."""
op.create_table(
- "log_filename",
+ "log_template",
Column("id", Integer, primary_key=True, autoincrement=True),
- Column("template", Text, nullable=False),
+ Column("filename", Text, nullable=False),
+ Column("task_prefix", Text, nullable=False),
Column("created_at", UtcDateTime, nullable=False),
)
dag_run_log_filename_id = Column(
- "log_filename_id",
+ "log_template_id",
Integer,
- ForeignKey("log_filename.id",
name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
+ ForeignKey("log_template.id",
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
)
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(dag_run_log_filename_id)
@@ -55,5 +56,5 @@ def upgrade():
def downgrade():
"""Remove fk on task instance and model for task log filename template."""
with op.batch_alter_table("dag_run") as batch_op:
- batch_op.drop_column("log_filename_id")
- op.drop_table("log_filename")
+ batch_op.drop_column("log_template_id")
+ op.drop_table("log_template")
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index eb6001e..4b8cb46 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -45,7 +45,7 @@ from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.models.taskinstance import TaskInstance as TI
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
@@ -96,13 +96,13 @@ class DagRun(Base, LoggingMixin):
# When a scheduler last attempted to schedule TIs for this DagRun
last_scheduling_decision = Column(UtcDateTime)
dag_hash = Column(String(32))
- # Foreign key to LogFilename. DagRun rows created prior to this column's
+ # Foreign key to LogTemplate. DagRun rows created prior to this column's
# existence have this set to NULL. Later rows automatically populate this
on
- # insert to point to the latest LogFilename entry.
- log_filename_id = Column(
+ # insert to point to the latest LogTemplate entry.
+ log_template_id = Column(
Integer,
- ForeignKey("log_filename.id",
name="task_instance_log_filename_id_fkey", ondelete="NO ACTION"),
- default=select([func.max(LogFilename.__table__.c.id)]),
+ ForeignKey("log_template.id",
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
+ default=select([func.max(LogTemplate.__table__.c.id)]),
)
# Remove this `if` after upgrading Sphinx-AutoAPI
@@ -939,14 +939,27 @@ class DagRun(Base, LoggingMixin):
return count
@provide_session
- def get_log_filename_template(self, *, session: Session = NEW_SESSION) ->
Optional[str]:
- if self.log_filename_id is None: # DagRun created before LogFilename
introduction.
- template =
session.query(LogFilename.template).order_by(LogFilename.id).limit(1).scalar()
+ def get_log_filename_template(self, *, session: Session = NEW_SESSION) ->
str:
+ if self.log_template_id is None: # DagRun created before LogTemplate
introduction.
+ template =
session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
else:
- template =
session.query(LogFilename.template).filter_by(id=self.log_filename_id).scalar()
+ template =
session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
if template is None:
raise AirflowException(
- f"No log_filename entry found for ID {self.log_filename_id!r}.
"
+ f"No log_template entry found for ID {self.log_template_id!r}.
"
+ f"Please make sure you set up the metadatabase correctly."
+ )
+ return template
+
+ @provide_session
+ def get_task_prefix_template(self, *, session: Session = NEW_SESSION) ->
str:
+ if self.log_template_id is None: # DagRun created before LogTemplate
introduction.
+ template =
session.query(LogTemplate.task_prefix).order_by(LogTemplate.id).limit(1).scalar()
+ else:
+ template =
session.query(LogTemplate.task_prefix).filter_by(id=self.log_template_id).scalar()
+ if template is None:
+ raise AirflowException(
+ f"No log_template entry found for ID {self.log_template_id!r}.
"
f"Please make sure you set up the metadatabase correctly."
)
return template
diff --git a/airflow/models/tasklog.py b/airflow/models/tasklog.py
index 7b660cb..bfbac42 100644
--- a/airflow/models/tasklog.py
+++ b/airflow/models/tasklog.py
@@ -23,19 +23,19 @@ from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime
-class LogFilename(Base):
- """Model to store ``[core] log_filename_template`` config changes.
+class LogTemplate(Base):
+ """Changes to ``log_filename_template`` and ``task_log_prefix_template``.
This table is automatically populated when Airflow starts up, to store the
config's value if it does not match the last row in the table.
"""
- __tablename__ = "log_filename"
+ __tablename__ = "log_template"
id = Column(Integer, primary_key=True, autoincrement=True)
- template = Column(Text, nullable=False)
+ filename = Column(Text, nullable=False)
+ task_prefix = Column(Text, nullable=False)
created_at = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
def __repr__(self) -> str:
- created_at = self.created_at.isoformat()
- return f"LogFilename(id={self.id!r}, template={self.template!r},
created_at={created_at!r})"
+ return f"LogTemplate(filename={self.filename!r},
task_prefix={self.task_prefix!r})"
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 351862c..db67999 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -34,7 +34,7 @@ from typing import TYPE_CHECKING, Callable, Optional,
TypeVar, cast
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.utils import cli_action_loggers
-from airflow.utils.db import check_and_run_migrations,
synchronize_log_filename_template
+from airflow.utils.db import check_and_run_migrations, synchronize_log_template
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.platform import getuser, is_terminal_support_colors
from airflow.utils.session import provide_session
@@ -94,7 +94,7 @@ def action_cli(func=None, check_db=True):
# Check and run migrations if necessary
if check_db:
check_and_run_migrations()
- synchronize_log_filename_template()
+ synchronize_log_template()
return f(*args, **kwargs)
except Exception as e:
metrics['error'] = e
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 827654f..e2ad98b 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -55,7 +55,7 @@ from airflow.models import ( # noqa: F401
# We need to add this model manually to get reset working well
from airflow.models.serialized_dag import SerializedDagModel # noqa: F401
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
from airflow.utils import helpers
# TODO: remove create_session once we decide to break backward compatibility
@@ -722,17 +722,18 @@ def check_and_run_migrations():
@provide_session
-def synchronize_log_filename_template(*, session: Session = NEW_SESSION) ->
None:
- """Synchronize log filename template config with table.
+def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
+ """Synchronize log template configs with table.
- This checks if the last row (based on timestamp) matches the current
- config value, and insert a new row if not.
+ This checks if the last row fully matches the current config values, and
+ insert a new row if not.
"""
- stored =
session.query(LogFilename.template).order_by(LogFilename.id.desc()).limit(1).scalar()
- config = conf.get("logging", "LOG_FILENAME_TEMPLATE")
- if stored == config:
+ stored = session.query(LogTemplate).order_by(LogTemplate.id.desc()).first()
+ filename = conf.get("logging", "log_filename_template")
+ prefix = conf.get("logging", "task_log_prefix_template")
+ if stored and stored.filename == filename and stored.task_prefix == prefix:
return
- session.merge(LogFilename(template=config))
+ session.merge(LogTemplate(filename=filename, task_prefix=prefix))
def check_conn_id_duplicates(session: Session) -> Iterable[str]:
@@ -1011,7 +1012,7 @@ def upgradedb(session: Session = NEW_SESSION):
log.info("Creating tables")
command.upgrade(config, 'heads')
add_default_pool_if_not_exists()
- synchronize_log_filename_template()
+ synchronize_log_template()
@provide_session
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 611c2e4..1ff57ef 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -171,7 +171,7 @@ def as_flattened_list(iterable: Iterable[Iterable[T]]) ->
List[T]:
return [e for i in iterable for e in i]
-def parse_template_string(template_string):
+def parse_template_string(template_string: str) -> Tuple[Optional[str],
Optional[jinja2.Template]]:
"""Parses Jinja template string."""
if "{{" in template_string: # jinja mode
return None, jinja2.Template(template_string)
diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py
b/airflow/utils/log/task_handler_with_custom_formatter.py
index b7b431b..6227d42 100644
--- a/airflow/utils/log/task_handler_with_custom_formatter.py
+++ b/airflow/utils/log/task_handler_with_custom_formatter.py
@@ -17,39 +17,45 @@
"""Custom logging formatter for Airflow"""
import logging
-from logging import StreamHandler
+from typing import TYPE_CHECKING, Optional
-from airflow.configuration import conf
from airflow.utils.helpers import parse_template_string,
render_template_to_string
+from airflow.utils.session import NEW_SESSION, provide_session
+if TYPE_CHECKING:
+ from jinja2 import Template
+ from sqlalchemy.orm import Session
-class TaskHandlerWithCustomFormatter(StreamHandler):
+ from airflow.models.taskinstance import TaskInstance
+
+
+class TaskHandlerWithCustomFormatter(logging.StreamHandler):
"""Custom implementation of StreamHandler, a class which writes logging
records for Airflow"""
- def __init__(self, stream):
- super().__init__()
- self.prefix_jinja_template = None
+ prefix_jinja_template: Optional["Template"] = None
- def set_context(self, ti):
+ @provide_session
+ def set_context(self, ti, *, session: "Session" = NEW_SESSION) -> None:
"""
Accept the run-time context (i.e. the current task) and configure the
formatter accordingly.
:param ti:
:return:
"""
- if ti.raw:
+ if ti.raw or self.formatter is None:
return
- prefix = conf.get('logging', 'task_log_prefix_template')
+ prefix = ti.get_dagrun().get_task_prefix_template(session=session)
- rendered_prefix = ""
if prefix:
_, self.prefix_jinja_template = parse_template_string(prefix)
rendered_prefix = self._render_prefix(ti)
- formatter = logging.Formatter(rendered_prefix + ":" +
self.formatter._fmt)
+ else:
+ rendered_prefix = ""
+ formatter =
logging.Formatter(f"{rendered_prefix}:{self.formatter._fmt}")
self.setFormatter(formatter)
self.setLevel(self.level)
- def _render_prefix(self, ti):
+ def _render_prefix(self, ti: "TaskInstance") -> str:
if self.prefix_jinja_template:
jinja_context = ti.get_template_context()
return render_template_to_string(self.prefix_jinja_template,
jinja_context)
diff --git a/docs/apache-airflow/migrations-ref.rst
b/docs/apache-airflow/migrations-ref.rst
index 9c5f4d0..e73f29e 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,8 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version |
Description
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``f9da662e7089`` (head) | ``786e3737b18f`` | ``2.3.0`` | Add
``LogFilename`` table to track ``log_filename_template`` value changes.
|
+| ``f9da662e7089`` (head) | ``786e3737b18f`` | ``2.3.0`` | Add
``LogTemplate`` table to track changes to config values
``log_filename_template`` |
+| | | | and
``task_log_prefix_template``.
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``786e3737b18f`` | ``5e3ec427fdd3`` | ``2.3.0`` | Add
``timetable_description`` column to DagModel for UI.
|
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/utils/test_task_handler_with_custom_formatter.py
b/tests/utils/test_task_handler_with_custom_formatter.py
index e2a3c77..23fc299 100644
--- a/tests/utils/test_task_handler_with_custom_formatter.py
+++ b/tests/utils/test_task_handler_with_custom_formatter.py
@@ -16,12 +16,15 @@
# specific language governing permissions and limitations
# under the License.
import logging
-import unittest
+
+import pytest
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
-from airflow.models import DAG, TaskInstance
+from airflow.models import DAG, DagRun, TaskInstance
+from airflow.models.tasklog import LogTemplate
from airflow.operators.dummy import DummyOperator
from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
@@ -29,44 +32,79 @@ from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
DEFAULT_DATE = datetime(2019, 1, 1)
-TASK_LOGGER = 'airflow.task'
TASK_HANDLER = 'task'
TASK_HANDLER_CLASS =
'airflow.utils.log.task_handler_with_custom_formatter.TaskHandlerWithCustomFormatter'
PREV_TASK_HANDLER = DEFAULT_LOGGING_CONFIG['handlers']['task']
+DAG_ID = "task_handler_with_custom_formatter_dag"
+TASK_ID = "task_handler_with_custom_formatter_task"
+
+
[email protected](scope="module", autouse=True)
+def custom_task_log_handler_config():
+ DEFAULT_LOGGING_CONFIG['handlers']['task'] = {
+ 'class': TASK_HANDLER_CLASS,
+ 'formatter': 'airflow',
+ 'stream': 'sys.stdout',
+ }
+ logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+ logging.root.disabled = False
+ yield
+ DEFAULT_LOGGING_CONFIG['handlers']['task'] = PREV_TASK_HANDLER
+ logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+
[email protected]()
+def task_instance():
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+ task = DummyOperator(task_id=TASK_ID, dag=dag)
+ dagrun = dag.create_dagrun(DagRunState.RUNNING,
execution_date=DEFAULT_DATE, run_type=DagRunType.MANUAL)
+ ti = TaskInstance(task=task, run_id=dagrun.run_id)
+ ti.log.disabled = False
+ yield ti
+ clear_db_runs()
+
+
[email protected]()
+def custom_prefix_template(task_instance):
+ run_filters = [DagRun.dag_id == DAG_ID, DagRun.execution_date ==
DEFAULT_DATE]
+ custom_prefix_template = "{{ ti.dag_id }}-{{ ti.task_id }}"
+ with create_session() as session:
+ log_template = session.merge(LogTemplate(filename="irrelevant",
task_prefix=custom_prefix_template))
+ session.flush() # To populate 'log_template.id'.
+ session.query(DagRun).filter(*run_filters).update({"log_template_id":
log_template.id})
+ yield custom_prefix_template
+ with create_session() as session:
+ session.query(DagRun).filter(*run_filters).update({"log_template_id":
None})
+ session.query(LogTemplate).filter(LogTemplate.id ==
log_template.id).delete()
+
+
+def assert_prefix(task_instance: TaskInstance, prefix: str) -> None:
+ handler = next((h for h in task_instance.log.handlers if h.name ==
TASK_HANDLER), None)
+ assert handler is not None, "custom task log handler not set up correctly"
+ assert handler.formatter is not None, "custom task log formatter not set
up correctly"
+ expected_format = f"{prefix}:{handler.formatter._fmt}"
+ set_context(task_instance.log, task_instance)
+ assert expected_format == handler.formatter._fmt
+
+
+def test_custom_formatter_default_format(task_instance):
+ """The default format provides no prefix."""
+ assert_prefix(task_instance, "")
+
+
+@conf_vars({("logging", "task_log_prefix_template"): "this is wrong"})
+def test_custom_formatter_default_format_not_affected_by_config(task_instance):
+ assert_prefix(task_instance, "")
+
+
[email protected]("custom_prefix_template")
+def test_custom_formatter_custom_format(task_instance):
+ """Use the prefix specified from the metadatabase."""
+ assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
+
-class TestTaskHandlerWithCustomFormatter(unittest.TestCase):
- def setUp(self):
- DEFAULT_LOGGING_CONFIG['handlers']['task'] = {
- 'class': TASK_HANDLER_CLASS,
- 'formatter': 'airflow',
- 'stream': 'sys.stdout',
- }
-
- logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
- logging.root.disabled = False
-
- def tearDown(self):
- clear_db_runs()
- DEFAULT_LOGGING_CONFIG['handlers']['task'] = PREV_TASK_HANDLER
-
- @conf_vars({('logging', 'task_log_prefix_template'):
"{{ti.dag_id}}-{{ti.task_id}}"})
- def test_formatter(self):
- dag = DAG('test_dag', start_date=DEFAULT_DATE)
- task = DummyOperator(task_id='test_task', dag=dag)
- dagrun = dag.create_dagrun(
- DagRunState.RUNNING,
- execution_date=DEFAULT_DATE,
- run_type=DagRunType.MANUAL,
- )
- ti = TaskInstance(task=task, run_id=dagrun.run_id)
-
- logger = ti.log
- ti.log.disabled = False
- handler = next((handler for handler in logger.handlers if handler.name
== TASK_HANDLER), None)
- assert handler is not None
-
- # setting the expected value of the formatter
- expected_formatter_value = "test_dag-test_task:" +
handler.formatter._fmt
- set_context(logger, ti)
- assert expected_formatter_value == handler.formatter._fmt
[email protected]("custom_prefix_template")
+@conf_vars({("logging", "task_log_prefix_template"): "this is wrong"})
+def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
+ assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
diff --git a/tests/www/views/test_views_log.py
b/tests/www/views/test_views_log.py
index cafd683..6dc4580 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -28,7 +28,7 @@ import pytest
from airflow import settings
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.models import DagBag, DagRun
-from airflow.models.tasklog import LogFilename
+from airflow.models.tasklog import LogTemplate
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
@@ -263,15 +263,15 @@ def
test_get_logs_for_changed_filename_format_config(log_admin_client):
def dag_run_with_log_filename():
run_filters = [DagRun.dag_id == DAG_ID, DagRun.execution_date ==
DEFAULT_DATE]
with create_session() as session:
- log_filename =
session.merge(LogFilename(template=DIFFERENT_LOG_FILENAME))
- session.flush() # To populate 'log_filename.id'.
+ log_template =
session.merge(LogTemplate(filename=DIFFERENT_LOG_FILENAME,
task_prefix="irrelevant"))
+ session.flush() # To populate 'log_template.id'.
run_query = session.query(DagRun).filter(*run_filters)
- run_query.update({"log_filename_id": log_filename.id})
+ run_query.update({"log_template_id": log_template.id})
dag_run = run_query.one()
yield dag_run
with create_session() as session:
- session.query(DagRun).filter(*run_filters).update({"log_filename_id":
None})
- session.query(LogFilename).filter(LogFilename.id ==
log_filename.id).delete()
+ session.query(DagRun).filter(*run_filters).update({"log_template_id":
None})
+ session.query(LogTemplate).filter(LogTemplate.id ==
log_template.id).delete()
def test_get_logs_for_changed_filename_format_db(log_admin_client,
dag_run_with_log_filename):