john-jac opened a new pull request, #66432:
URL: https://github.com/apache/airflow/pull/66432

   ### Description
   
   Adds a new listener hookspec `on_audit_log_created` that fires whenever an 
audit log record is persisted via the API action logging system. This enables 
plugins and providers to react to audit events — forwarding to external 
logging/SIEM systems, triggering alerts, or enriching observability pipelines — 
without polling the REST API.
   
   Ref: #66018
   
   ### Motivation
   
   Airflow's listener system currently covers task instances, DAG runs, assets, 
and lifecycle events. Audit logs — the records of *who did what* (paused a DAG, 
modified a variable, deleted a connection) — are the notable gap. Today the 
only way to consume these externally is to poll the `/eventLogs` API endpoint, 
which introduces latency, requires dedicated infrastructure (Lambda, cron job, 
etc.), and doesn't scale well for real-time compliance requirements.
   
   This is a recurring community request:
   
   - **#66018** — Request to push audit logs to CloudWatch for Splunk 
integration; currently requires an external Lambda polling the API.
   - **[Discussion 
#34142](https://github.com/apache/airflow/discussions/34142)** — User attempted 
to read audit logs from within a Listener plugin but had to query the database 
directly, which caused HA lock errors. Maintainer (@Taragolis) confirmed the 
only option is the REST API.
   - **[Discussion 
#24648](https://github.com/apache/airflow/discussions/24648)** — User seeking 
to forward Airflow logs to Splunk without FluentD.
   - **#23880 / #24079** — Added `action_logging` to connection/variable 
endpoints, solving *what* gets audited but not *how to forward* those events 
externally.
   
   This change brings audit logs to parity with other first-class Airflow 
events by exposing them through the same pluggy-based listener mechanism that 
providers and plugins already use.
   
   ### Scope
   
   This PR fires the hook **only from the `action_logging` decorator** — i.e., 
user-initiated API actions (pause/unpause DAGs, create/delete connections, 
modify variables, trigger DAG runs, clear task instances, etc.). It does 
**not** fire for high-frequency system-generated log entries (task state 
transitions, scheduler state mismatches), which are operational telemetry 
rather than audit events and already have dedicated hooks 
(`on_task_instance_running`, `on_task_instance_failed`, etc.).
   
   ### Changes
   
   - New hookspec: `airflow/listeners/spec/audit_log.py`
   - Register it in `airflow/listeners/listener.py`
   - Fire the hook from `api_fastapi/logging/decorators.py` after the DB commit
   - Unit tests
   
   ---
   
   ### FAQ
   
   **Q: Why not handle this in a provider package without a core change?**
   
   A: The only current integration point is the REST API, which requires 
external polling infrastructure and introduces latency. As demonstrated in 
[Discussion #34142](https://github.com/apache/airflow/discussions/34142), even 
attempting to read audit logs from within a Listener plugin today requires 
querying the database directly — which causes HA lock errors and is fragile 
across version upgrades. The listener system is Airflow's intended extension 
mechanism for cross-cutting event-driven concerns. Assets, DAG runs, and task 
instances all have listener hooks; audit logs are the missing piece.
   
   **Q: What about performance? This runs on every API mutation.**
   
   A: The hook fires *after* the DB commit, so it cannot affect the persistence 
of the audit record itself. If no listeners are registered, the cost is a 
single no-op function call. API mutations are human-rate operations — orders of 
magnitude less frequent than task heartbeats or scheduler loops.
   
   **Q: Why not use a SQLAlchemy `after_insert` event on the `Log` model 
instead?**
   
   A: SQLAlchemy ORM events run inside the active transaction. A slow or 
failing listener would block the commit or cause deadlocks — exactly the "HA 
lock errors" reported in [Discussion 
#34142](https://github.com/apache/airflow/discussions/34142). The pluggy 
listener system runs outside the transaction boundary.
   
   **Q: Why not just emit to a standard Python `logging.Logger`?**
   
   A: A Python log record is an unstructured string. The `Log` model object 
carries structured fields (`event`, `dag_id`, `task_id`, `owner`, `extra`, 
`dttm`) that consumers need for filtering, routing, and compliance reporting.
   
   **Q: Why not also fire for task state changes and scheduler events?**
   
   A: Those are high-frequency system-generated events with dedicated hooks 
already. This PR scopes to user-initiated actions — the events that answer "who 
changed what and when" for compliance purposes.
   
   **Q: Who would consume this?**
   
   A: CloudWatch, Datadog, Splunk, Elasticsearch/OpenSearch, Azure Monitor, GCP 
Cloud Logging, OpenTelemetry collectors, custom compliance webhooks, 
Kafka-based audit pipelines.
   
   **Q: Is this a breaking change?**
   
   A: No. Purely additive. Existing deployments are unaffected unless they 
register a listener for the new hook.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to