pierrejeambrun commented on code in PR #45609:
URL: https://github.com/apache/airflow/pull/45609#discussion_r1922539980


##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")

Review Comment:
   I think we should use the `get_name` method of the BaseUser interface for 
both for now.
   
   We cannot use the auth manager interface for now `get_user_name` and 
`get_user_display_name` because the `get_user` method will fail because it 
needs a flask session.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username

Review Comment:
   Comment to remove



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})
+        except Exception as e:
+            extra_fields["json_error"] = str(e)
+
+        # Merge query parameters and JSON body to extract key fields
+        params = {**query_params, **json_body}
+
+        # Extract relevant fields for logging
+        task_id = params.get("task_id")
+        dag_id = params.get("dag_id")
+        run_id = params.get("run_id") or params.get("dag_run_id")
+        logical_date = params.get("logical_date")
+
+        parsed_logical_date = None
+        if logical_date:
+            try:
+                parsed_date = pendulum.parse(logical_date, strict=False)
+                if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+                    parsed_logical_date = parsed_date.isoformat()
+                else:
+                    extra_fields["logical_date_error"] = (
+                        f"Unsupported type for logical_date: 
{type(parsed_date).__name__}"
+                    )

Review Comment:
   I don't think we need that extra type check. This wasn't here in the legacy 
version, this will just fall back to the `except` clause.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":

Review Comment:
   we should check if `"application/json"` is in 
`request.headers.get("content-type", "")`
   
   Because sometimes the header is a little bit more sophisticated, with the 
utf-8 encoding etc... and it will not perfectly match, even though it's still 
an `application/json` payload.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})
+        except Exception as e:
+            extra_fields["json_error"] = str(e)
+
+        # Merge query parameters and JSON body to extract key fields
+        params = {**query_params, **json_body}
+
+        # Extract relevant fields for logging
+        task_id = params.get("task_id")
+        dag_id = params.get("dag_id")
+        run_id = params.get("run_id") or params.get("dag_run_id")
+        logical_date = params.get("logical_date")
+
+        parsed_logical_date = None
+        if logical_date:
+            try:
+                parsed_date = pendulum.parse(logical_date, strict=False)
+                if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+                    parsed_logical_date = parsed_date.isoformat()
+                else:

Review Comment:
   `is_paused` is missing from the extra fields now I believe.
   
   I think the easiest option is to keep a close implementation to the legacy 
`action_logging` that is proven to be working and has all the information 
needed.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):

Review Comment:
   Why is that in the `db` package ?
   
   I don't think this make sense. Maybe logging, decorators would be an other 
option.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})

Review Comment:
   I think we are only redacting the json body.
   
   While we also used to redact the query parameters and other request related 
fields I believe.
   
   ```python
                   extra_fields = {
                       k: secrets_masker.redact(v, k)
                       for k, v in 
itertools.chain(request.values.items(multi=True), request.view_args.items())
                       if k not in fields_skip_logging
                   }
   
   ```



##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -329,7 +329,11 @@ def get_dag_runs(
     ),
 )
 def trigger_dag_run(
-    dag_id, body: TriggerDAGRunPostBody, request: Request, session: SessionDep
+    dag_id,
+    body: TriggerDAGRunPostBody,
+    request: Request,
+    session: SessionDep,
+    _: None = Depends(action_logging("trigger_dag_run")),

Review Comment:
   Depends that are not needed can go directly into the `dependencies` of the 
route decorator.



##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -329,7 +329,11 @@ def get_dag_runs(
     ),
 )
 def trigger_dag_run(
-    dag_id, body: TriggerDAGRunPostBody, request: Request, session: SessionDep
+    dag_id,
+    body: TriggerDAGRunPostBody,
+    request: Request,
+    session: SessionDep,
+    _: None = Depends(action_logging("trigger_dag_run")),

Review Comment:
   Also we need to add back the logging for all endpoints, based on the legacy 
api action logging.
   
   (Basically taking all the endpoint 1 by 1 and adding the appropriate action 
logger on each of them).



##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -53,6 +53,18 @@ def get_user(token_str: Annotated[str, 
Depends(oauth2_scheme)]) -> BaseUser:
         raise HTTPException(403, "Forbidden")
 
 
+async def get_user_with_exception_handling(request: Request) -> BaseUser | 
None:
+    # Currently UI does not support JWT token this method handles if no token 
is provided by UI
+    # we can remove this method after issue 
https://github.com/apache/airflow/issues/44884 is dome

Review Comment:
   ```suggestion
       # Currently the UI does not support JWT authentication, this method 
defines a fallback if no token is provided by the UI.
       # We can remove this method when issue 
https://github.com/apache/airflow/issues/44884 is done.
   ```



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})
+        except Exception as e:
+            extra_fields["json_error"] = str(e)
+
+        # Merge query parameters and JSON body to extract key fields
+        params = {**query_params, **json_body}
+
+        # Extract relevant fields for logging
+        task_id = params.get("task_id")
+        dag_id = params.get("dag_id")
+        run_id = params.get("run_id") or params.get("dag_run_id")
+        logical_date = params.get("logical_date")
+
+        parsed_logical_date = None
+        if logical_date:
+            try:
+                parsed_date = pendulum.parse(logical_date, strict=False)
+                if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+                    parsed_logical_date = parsed_date.isoformat()
+                else:
+                    extra_fields["logical_date_error"] = (
+                        f"Unsupported type for logical_date: 
{type(parsed_date).__name__}"
+                    )
+            except pendulum.exceptions.ParserError:
+                extra_fields["logical_date_error"] = f"Invalid logical_date: 
{logical_date}"

Review Comment:
   Here we changed the behavior by storing the parsing error in a new extra 
field `logical_date_error`, can you elaborate why this is needed ?
   
   Before we used to simply log the exception:
   ```python
   logger.exception(
       "Failed to parse logical_date from the request: %s", logical_date_value
   )
   ```



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})

Review Comment:
   We still need calls to `_mask_variable_fields` and `_mask_connection_fields` 
I believe because  those have a different structure.
   
   For instance, variables, `{"key": "something", "value": "the value"}`, we 
need to redact `the value` if `key` value, i.e `something` is a sensitive field.



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})
+        except Exception as e:
+            extra_fields["json_error"] = str(e)
+
+        # Merge query parameters and JSON body to extract key fields
+        params = {**query_params, **json_body}
+
+        # Extract relevant fields for logging
+        task_id = params.get("task_id")
+        dag_id = params.get("dag_id")
+        run_id = params.get("run_id") or params.get("dag_run_id")
+        logical_date = params.get("logical_date")
+
+        parsed_logical_date = None
+        if logical_date:
+            try:
+                parsed_date = pendulum.parse(logical_date, strict=False)
+                if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+                    parsed_logical_date = parsed_date.isoformat()
+                else:
+                    extra_fields["logical_date_error"] = (
+                        f"Unsupported type for logical_date: 
{type(parsed_date).__name__}"
+                    )
+            except pendulum.exceptions.ParserError:
+                extra_fields["logical_date_error"] = f"Invalid logical_date: 
{logical_date}"
+
+        # Mask sensitive fields
+        fields_skip_logging = {
+            "csrf_token",
+            "_csrf_token",
+            "is_paused",
+        }
+        extra_fields = {k: v for k, v in extra_fields.items() if k not in 
fields_skip_logging}
+
+        # Set event name dynamically if not provided
+        event_name = event or request.url.path.strip("/").replace("/", ".")

Review Comment:
   You are using the `path` to get the `event_name` how will we dissociate a 
GET and a POST on for instance `/connections`. Both will end up with the same 
name. `connections`.
   
   I think we should keep using the function name, or log the method as well. 
`HTTP_METHOD some.url`



##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -53,6 +53,18 @@ def get_user(token_str: Annotated[str, 
Depends(oauth2_scheme)]) -> BaseUser:
         raise HTTPException(403, "Forbidden")
 
 
+async def get_user_with_exception_handling(request: Request) -> BaseUser | 
None:
+    # Currently UI does not support JWT token this method handles if no token 
is provided by UI
+    # we can remove this method after issue 
https://github.com/apache/airflow/issues/44884 is dome
+    try:
+        token_str = await oauth2_scheme(request)
+        if not token_str:  # Handle None or empty token
+            return None
+        return get_user(token_str)
+    except HTTPException:

Review Comment:
   I think we should catch a more precise exception. (Only the one that is 
raised when the auth token in missing)



##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
     statement = apply_filters_to_select(statement=statement, 
filters=[order_by, offset, limit])
 
     return statement, total_entries
+
+
+def action_logging(event: str | None):
+    async def log_action(
+        request: Request,
+        session: SessionDep,
+        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+    ):
+        """Log user actions."""
+        if not user:
+            user_name = "anonymous"
+            user_display = ""
+        else:
+            # user_name = user.role
+            # user_display = user.username
+            user_name = getattr(user, "role", "unknown_role")
+            user_display = getattr(user, "username", "unknown_user")
+
+        # Extract basic request details
+        query_params = dict(request.query_params)
+        extra_fields = {
+            "path": request.url.path,
+            "method": request.method,
+            "query_params": query_params,
+        }
+
+        # Add JSON body if present
+        json_body = {}
+        try:
+            if request.headers.get("content-type") == "application/json":
+                json_body = await request.json()
+                extra_fields.update({k: secrets_masker.redact(v, k) for k, v 
in json_body.items()})
+        except Exception as e:
+            extra_fields["json_error"] = str(e)

Review Comment:
   That's new, do we need to store redacting related errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to