This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c450cbfbf9 Make default_action_log an RPC function (#38946)
c450cbfbf9 is described below
commit c450cbfbf9e1b2339ae2329b2898b92c6beda527
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Apr 12 14:43:07 2024 -0700
Make default_action_log an RPC function (#38946)
To use RPC, we need to accept a session, which is provided by the RPC call
handler. But, the action log callback system may already be forwarding a
session, so to avoid a collision, I have made this internal function instead of
making default_action_log an RPC function.
---
airflow/api_internal/endpoints/rpc_api_endpoint.py | 4 +-
airflow/utils/cli_action_loggers.py | 80 ++++++++++++++++------
2 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index c428e8e481..b5fd545066 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -47,12 +47,14 @@ def _initialize_map() -> dict[str, Callable]:
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.secrets.metastore import MetastoreBackend
+ from airflow.utils.cli_action_loggers import _default_action_log_internal
from airflow.utils.log.file_task_handler import FileTaskHandler
functions: list[Callable] = [
+ _default_action_log_internal,
_get_template_context,
- _update_rtif,
_get_ti_db_access,
+ _update_rtif,
DagFileProcessor.update_import_errors,
DagFileProcessor.manage_slas,
DagFileProcessorManager.deactivate_stale_dags,
diff --git a/airflow/utils/cli_action_loggers.py
b/airflow/utils/cli_action_loggers.py
index da56f7db7e..7ac2442a04 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -26,7 +26,13 @@ from __future__ import annotations
import json
import logging
-from typing import Callable
+from typing import TYPE_CHECKING, Callable
+
+from airflow.api_internal.internal_api_call import internal_api_call
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
@@ -100,32 +106,62 @@ def default_action_log(sub_command, user, task_id,
dag_id, execution_date, host_
The difference is this function uses the global ORM session, and pushes a
``Log`` row into the database instead of actually logging.
"""
+ _default_action_log_internal(
+ sub_command=sub_command,
+ user=user,
+ task_id=task_id,
+ dag_id=dag_id,
+ execution_date=execution_date,
+ host_name=host_name,
+ full_command=full_command,
+ )
+
+
+@internal_api_call
+@provide_session
+def _default_action_log_internal(
+ *,
+ sub_command,
+ user,
+ task_id,
+ dag_id,
+ execution_date,
+ host_name,
+ full_command,
+ session: Session = NEW_SESSION,
+):
+ """
+ RPC portion of default_action_log.
+
+ To use RPC, we need to accept a session, which is provided by the RPC call
handler.
+ But, the action log callback system may already be forwarding a session,
so to avoid
+ a collision, I have made this internal function instead of making
default_action_log
+ an RPC function.
+ """
from sqlalchemy.exc import OperationalError, ProgrammingError
from airflow.models.log import Log
from airflow.utils import timezone
- from airflow.utils.session import create_session
try:
- with create_session() as session:
- extra = json.dumps({"host_name": host_name, "full_command":
full_command})
- # Use bulk_insert_mappings here to avoid importing all models
(which using the classes does) early
- # on in the CLI
- session.bulk_insert_mappings(
- Log,
- [
- {
- "event": f"cli_{sub_command}",
- "task_instance": None,
- "owner": user,
- "extra": extra,
- "task_id": task_id,
- "dag_id": dag_id,
- "execution_date": execution_date,
- "dttm": timezone.utcnow(),
- }
- ],
- )
+ # Use bulk_insert_mappings here to avoid importing all models (which
using the classes does) early
+ # on in the CLI
+ session.bulk_insert_mappings(
+ Log,
+ [
+ {
+ "event": f"cli_{sub_command}",
+ "task_instance": None,
+ "owner": user,
+ "extra": json.dumps({"host_name": host_name,
"full_command": full_command}),
+ "task_id": task_id,
+ "dag_id": dag_id,
+ "execution_date": execution_date,
+ "dttm": timezone.utcnow(),
+ }
+ ],
+ )
+ session.commit()
except (OperationalError, ProgrammingError) as e:
expected = [
'"log" does not exist', # postgres
@@ -135,8 +171,10 @@ def default_action_log(sub_command, user, task_id, dag_id,
execution_date, host_
error_is_ok = e.args and any(x in e.args[0] for x in expected)
if not error_is_ok:
logger.warning("Failed to log action %s", e)
+ session.rollback()
except Exception as e:
logger.warning("Failed to log action %s", e)
+ session.rollback()
__pre_exec_callbacks: list[Callable] = []