pierrejeambrun commented on code in PR #45609:
URL: https://github.com/apache/airflow/pull/45609#discussion_r1922539980
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
Review Comment:
I think we should use the `get_name` method of the BaseUser interface for
both for now.
We cannot use the auth manager interface for now `get_user_name` and
`get_user_display_name` because the `get_user` method will fail because it
needs a flask session.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
Review Comment:
Comment to remove
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
+ except Exception as e:
+ extra_fields["json_error"] = str(e)
+
+ # Merge query parameters and JSON body to extract key fields
+ params = {**query_params, **json_body}
+
+ # Extract relevant fields for logging
+ task_id = params.get("task_id")
+ dag_id = params.get("dag_id")
+ run_id = params.get("run_id") or params.get("dag_run_id")
+ logical_date = params.get("logical_date")
+
+ parsed_logical_date = None
+ if logical_date:
+ try:
+ parsed_date = pendulum.parse(logical_date, strict=False)
+ if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+ parsed_logical_date = parsed_date.isoformat()
+ else:
+ extra_fields["logical_date_error"] = (
+ f"Unsupported type for logical_date:
{type(parsed_date).__name__}"
+ )
Review Comment:
I don't think we need that extra type check. This wasn't here in the legacy
version, this will just fall back to the `except` clause.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
Review Comment:
we should check if `"application/json"` is in
`request.headers.get("content-type", "")`
Because sometimes the header is a little bit more sophisticated, with the
utf-8 encoding etc... and it will not perfectly match, even though it's still
an `application/json` payload.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
+ except Exception as e:
+ extra_fields["json_error"] = str(e)
+
+ # Merge query parameters and JSON body to extract key fields
+ params = {**query_params, **json_body}
+
+ # Extract relevant fields for logging
+ task_id = params.get("task_id")
+ dag_id = params.get("dag_id")
+ run_id = params.get("run_id") or params.get("dag_run_id")
+ logical_date = params.get("logical_date")
+
+ parsed_logical_date = None
+ if logical_date:
+ try:
+ parsed_date = pendulum.parse(logical_date, strict=False)
+ if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+ parsed_logical_date = parsed_date.isoformat()
+ else:
Review Comment:
`is_paused` is missing from the extra fields now I believe.
I think the easiest option is to keep a close implementation to the legacy
`action_logging` that is proven to be working and has all the information
needed.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
Review Comment:
Why is that in the `db` package ?
I don't think this make sense. Maybe logging, decorators would be an other
option.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
Review Comment:
I think we are only redacting the json body.
While we also used to redact the query parameters and other request related
fields I believe.
```python
extra_fields = {
k: secrets_masker.redact(v, k)
for k, v in
itertools.chain(request.values.items(multi=True), request.view_args.items())
if k not in fields_skip_logging
}
```
##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -329,7 +329,11 @@ def get_dag_runs(
),
)
def trigger_dag_run(
- dag_id, body: TriggerDAGRunPostBody, request: Request, session: SessionDep
+ dag_id,
+ body: TriggerDAGRunPostBody,
+ request: Request,
+ session: SessionDep,
+ _: None = Depends(action_logging("trigger_dag_run")),
Review Comment:
Depends that are not needed can go directly into the `dependencies` of the
route decorator.
##########
airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -329,7 +329,11 @@ def get_dag_runs(
),
)
def trigger_dag_run(
- dag_id, body: TriggerDAGRunPostBody, request: Request, session: SessionDep
+ dag_id,
+ body: TriggerDAGRunPostBody,
+ request: Request,
+ session: SessionDep,
+ _: None = Depends(action_logging("trigger_dag_run")),
Review Comment:
Also we need to add back the logging for all endpoints, based on the legacy
api action logging.
(Basically taking all the endpoint 1 by 1 and adding the appropriate action
logger on each of them).
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -53,6 +53,18 @@ def get_user(token_str: Annotated[str,
Depends(oauth2_scheme)]) -> BaseUser:
raise HTTPException(403, "Forbidden")
+async def get_user_with_exception_handling(request: Request) -> BaseUser |
None:
+ # Currently UI does not support JWT token this method handles if no token
is provided by UI
+ # we can remove this method after issue
https://github.com/apache/airflow/issues/44884 is dome
Review Comment:
```suggestion
# Currently the UI does not support JWT authentication, this method
defines a fallback if no token is provided by the UI.
# We can remove this method when issue
https://github.com/apache/airflow/issues/44884 is done.
```
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
+ except Exception as e:
+ extra_fields["json_error"] = str(e)
+
+ # Merge query parameters and JSON body to extract key fields
+ params = {**query_params, **json_body}
+
+ # Extract relevant fields for logging
+ task_id = params.get("task_id")
+ dag_id = params.get("dag_id")
+ run_id = params.get("run_id") or params.get("dag_run_id")
+ logical_date = params.get("logical_date")
+
+ parsed_logical_date = None
+ if logical_date:
+ try:
+ parsed_date = pendulum.parse(logical_date, strict=False)
+ if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+ parsed_logical_date = parsed_date.isoformat()
+ else:
+ extra_fields["logical_date_error"] = (
+ f"Unsupported type for logical_date:
{type(parsed_date).__name__}"
+ )
+ except pendulum.exceptions.ParserError:
+ extra_fields["logical_date_error"] = f"Invalid logical_date:
{logical_date}"
Review Comment:
Here we changed the behavior by storing the parsing error in a new extra
field `logical_date_error`, can you elaborate why this is needed ?
Before we used to simply log the exception:
```python
logger.exception(
"Failed to parse logical_date from the request: %s", logical_date_value
)
```
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
Review Comment:
We still need calls to `_mask_variable_fields` and `_mask_connection_fields`
I believe because those have a different structure.
For instance, variables, `{"key": "something", "value": "the value"}`, we
need to redact `the value` if `key` value, i.e `something` is a sensitive field.
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
+ except Exception as e:
+ extra_fields["json_error"] = str(e)
+
+ # Merge query parameters and JSON body to extract key fields
+ params = {**query_params, **json_body}
+
+ # Extract relevant fields for logging
+ task_id = params.get("task_id")
+ dag_id = params.get("dag_id")
+ run_id = params.get("run_id") or params.get("dag_run_id")
+ logical_date = params.get("logical_date")
+
+ parsed_logical_date = None
+ if logical_date:
+ try:
+ parsed_date = pendulum.parse(logical_date, strict=False)
+ if isinstance(parsed_date, (pendulum.DateTime, pendulum.Date)):
+ parsed_logical_date = parsed_date.isoformat()
+ else:
+ extra_fields["logical_date_error"] = (
+ f"Unsupported type for logical_date:
{type(parsed_date).__name__}"
+ )
+ except pendulum.exceptions.ParserError:
+ extra_fields["logical_date_error"] = f"Invalid logical_date:
{logical_date}"
+
+ # Mask sensitive fields
+ fields_skip_logging = {
+ "csrf_token",
+ "_csrf_token",
+ "is_paused",
+ }
+ extra_fields = {k: v for k, v in extra_fields.items() if k not in
fields_skip_logging}
+
+ # Set event name dynamically if not provided
+ event_name = event or request.url.path.strip("/").replace("/", ".")
Review Comment:
You are using the `path` to get the `event_name` how will we dissociate a
GET and a POST on for instance `/connections`. Both will end up with the same
name. `connections`.
I think we should keep using the function name, or log the method as well.
`HTTP_METHOD some.url`
##########
airflow/api_fastapi/core_api/security.py:
##########
@@ -53,6 +53,18 @@ def get_user(token_str: Annotated[str,
Depends(oauth2_scheme)]) -> BaseUser:
raise HTTPException(403, "Forbidden")
+async def get_user_with_exception_handling(request: Request) -> BaseUser |
None:
+ # Currently UI does not support JWT token this method handles if no token
is provided by UI
+ # we can remove this method after issue
https://github.com/apache/airflow/issues/44884 is dome
+ try:
+ token_str = await oauth2_scheme(request)
+ if not token_str: # Handle None or empty token
+ return None
+ return get_user(token_str)
+ except HTTPException:
Review Comment:
I think we should catch a more precise exception. (Only the one that is
raised when the auth token in missing)
##########
airflow/api_fastapi/common/db/common.py:
##########
@@ -179,3 +185,91 @@ def paginated_select(
statement = apply_filters_to_select(statement=statement,
filters=[order_by, offset, limit])
return statement, total_entries
+
+
+def action_logging(event: str | None):
+ async def log_action(
+ request: Request,
+ session: SessionDep,
+ user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
+ ):
+ """Log user actions."""
+ if not user:
+ user_name = "anonymous"
+ user_display = ""
+ else:
+ # user_name = user.role
+ # user_display = user.username
+ user_name = getattr(user, "role", "unknown_role")
+ user_display = getattr(user, "username", "unknown_user")
+
+ # Extract basic request details
+ query_params = dict(request.query_params)
+ extra_fields = {
+ "path": request.url.path,
+ "method": request.method,
+ "query_params": query_params,
+ }
+
+ # Add JSON body if present
+ json_body = {}
+ try:
+ if request.headers.get("content-type") == "application/json":
+ json_body = await request.json()
+ extra_fields.update({k: secrets_masker.redact(v, k) for k, v
in json_body.items()})
+ except Exception as e:
+ extra_fields["json_error"] = str(e)
Review Comment:
That's new, do we need to store redacting related errors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]