This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c450cbfbf9 Make default_action_log an RPC function (#38946)
c450cbfbf9 is described below

commit c450cbfbf9e1b2339ae2329b2898b92c6beda527
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Apr 12 14:43:07 2024 -0700

    Make default_action_log an RPC function (#38946)
    
    To use RPC, we need to accept a session, which is provided by the RPC call 
handler. But, the action log callback system may already be forwarding a 
session, so to avoid a collision, I have made this internal function instead of 
making default_action_log an RPC function.
---
 airflow/api_internal/endpoints/rpc_api_endpoint.py |  4 +-
 airflow/utils/cli_action_loggers.py                | 80 ++++++++++++++++------
 2 files changed, 62 insertions(+), 22 deletions(-)

diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index c428e8e481..b5fd545066 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -47,12 +47,14 @@ def _initialize_map() -> dict[str, Callable]:
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.models.taskinstance import TaskInstance
     from airflow.secrets.metastore import MetastoreBackend
+    from airflow.utils.cli_action_loggers import _default_action_log_internal
     from airflow.utils.log.file_task_handler import FileTaskHandler
 
     functions: list[Callable] = [
+        _default_action_log_internal,
         _get_template_context,
-        _update_rtif,
         _get_ti_db_access,
+        _update_rtif,
         DagFileProcessor.update_import_errors,
         DagFileProcessor.manage_slas,
         DagFileProcessorManager.deactivate_stale_dags,
diff --git a/airflow/utils/cli_action_loggers.py 
b/airflow/utils/cli_action_loggers.py
index da56f7db7e..7ac2442a04 100644
--- a/airflow/utils/cli_action_loggers.py
+++ b/airflow/utils/cli_action_loggers.py
@@ -26,7 +26,13 @@ from __future__ import annotations
 
 import json
 import logging
-from typing import Callable
+from typing import TYPE_CHECKING, Callable
+
+from airflow.api_internal.internal_api_call import internal_api_call
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
 
 logger = logging.getLogger(__name__)
 
@@ -100,32 +106,62 @@ def default_action_log(sub_command, user, task_id, 
dag_id, execution_date, host_
     The difference is this function uses the global ORM session, and pushes a
     ``Log`` row into the database instead of actually logging.
     """
+    _default_action_log_internal(
+        sub_command=sub_command,
+        user=user,
+        task_id=task_id,
+        dag_id=dag_id,
+        execution_date=execution_date,
+        host_name=host_name,
+        full_command=full_command,
+    )
+
+
+@internal_api_call
+@provide_session
+def _default_action_log_internal(
+    *,
+    sub_command,
+    user,
+    task_id,
+    dag_id,
+    execution_date,
+    host_name,
+    full_command,
+    session: Session = NEW_SESSION,
+):
+    """
+    RPC portion of default_action_log.
+
+    To use RPC, we need to accept a session, which is provided by the RPC call 
handler.
+    But, the action log callback system may already be forwarding a session, 
so to avoid
+    a collision, I have made this internal function instead of making 
default_action_log
+    an RPC function.
+    """
     from sqlalchemy.exc import OperationalError, ProgrammingError
 
     from airflow.models.log import Log
     from airflow.utils import timezone
-    from airflow.utils.session import create_session
 
     try:
-        with create_session() as session:
-            extra = json.dumps({"host_name": host_name, "full_command": 
full_command})
-            # Use bulk_insert_mappings here to avoid importing all models 
(which using the classes does) early
-            # on in the CLI
-            session.bulk_insert_mappings(
-                Log,
-                [
-                    {
-                        "event": f"cli_{sub_command}",
-                        "task_instance": None,
-                        "owner": user,
-                        "extra": extra,
-                        "task_id": task_id,
-                        "dag_id": dag_id,
-                        "execution_date": execution_date,
-                        "dttm": timezone.utcnow(),
-                    }
-                ],
-            )
+        # Use bulk_insert_mappings here to avoid importing all models (which 
using the classes does) early
+        # on in the CLI
+        session.bulk_insert_mappings(
+            Log,
+            [
+                {
+                    "event": f"cli_{sub_command}",
+                    "task_instance": None,
+                    "owner": user,
+                    "extra": json.dumps({"host_name": host_name, 
"full_command": full_command}),
+                    "task_id": task_id,
+                    "dag_id": dag_id,
+                    "execution_date": execution_date,
+                    "dttm": timezone.utcnow(),
+                }
+            ],
+        )
+        session.commit()
     except (OperationalError, ProgrammingError) as e:
         expected = [
             '"log" does not exist',  # postgres
@@ -135,8 +171,10 @@ def default_action_log(sub_command, user, task_id, dag_id, 
execution_date, host_
         error_is_ok = e.args and any(x in e.args[0] for x in expected)
         if not error_is_ok:
             logger.warning("Failed to log action %s", e)
+        session.rollback()
     except Exception as e:
         logger.warning("Failed to log action %s", e)
+        session.rollback()
 
 
 __pre_exec_callbacks: list[Callable] = []

Reply via email to