john-jac opened a new pull request, #66432: URL: https://github.com/apache/airflow/pull/66432
### Description Adds a new listener hookspec `on_audit_log_created` that fires whenever an audit log record is persisted via the API action logging system. This enables plugins and providers to react to audit events — forwarding to external logging/SIEM systems, triggering alerts, or enriching observability pipelines — without polling the REST API. Ref: #66018 ### Motivation Airflow's listener system currently covers task instances, DAG runs, assets, and lifecycle events. Audit logs — the records of *who did what* (paused a DAG, modified a variable, deleted a connection) — are the notable gap. Today the only way to consume these externally is to poll the `/eventLogs` API endpoint, which introduces latency, requires dedicated infrastructure (Lambda, cron job, etc.), and doesn't scale well for real-time compliance requirements. This is a recurring community request: - **#66018** — Request to push audit logs to CloudWatch for Splunk integration; currently requires an external Lambda polling the API. - **[Discussion #34142](https://github.com/apache/airflow/discussions/34142)** — User attempted to read audit logs from within a Listener plugin but had to query the database directly, which caused HA lock errors. Maintainer (@Taragolis) confirmed the only option is the REST API. - **[Discussion #24648](https://github.com/apache/airflow/discussions/24648)** — User seeking to forward Airflow logs to Splunk without FluentD. - **#23880 / #24079** — Added `action_logging` to connection/variable endpoints, solving *what* gets audited but not *how to forward* those events externally. This change brings audit logs to parity with other first-class Airflow events by exposing them through the same pluggy-based listener mechanism that providers and plugins already use. ### Scope This PR fires the hook **only from the `action_logging` decorator** — i.e., user-initiated API actions (pause/unpause DAGs, create/delete connections, modify variables, trigger DAG runs, clear task instances, etc.). It does **not** fire for high-frequency system-generated log entries (task state transitions, scheduler state mismatches), which are operational telemetry rather than audit events and already have dedicated hooks (`on_task_instance_running`, `on_task_instance_failed`, etc.). ### Changes - New hookspec: `airflow/listeners/spec/audit_log.py` - Register it in `airflow/listeners/listener.py` - Fire the hook from `api_fastapi/logging/decorators.py` after the DB commit - Unit tests --- ### FAQ **Q: Why not handle this in a provider package without a core change?** A: The only current integration point is the REST API, which requires external polling infrastructure and introduces latency. As demonstrated in [Discussion #34142](https://github.com/apache/airflow/discussions/34142), even attempting to read audit logs from within a Listener plugin today requires querying the database directly — which causes HA lock errors and is fragile across version upgrades. The listener system is Airflow's intended extension mechanism for cross-cutting event-driven concerns. Assets, DAG runs, and task instances all have listener hooks; audit logs are the missing piece. **Q: What about performance? This runs on every API mutation.** A: The hook fires *after* the DB commit, so it cannot affect the persistence of the audit record itself. If no listeners are registered, the cost is a single no-op function call. API mutations are human-rate operations — orders of magnitude less frequent than task heartbeats or scheduler loops. **Q: Why not use a SQLAlchemy `after_insert` event on the `Log` model instead?** A: SQLAlchemy ORM events run inside the active transaction. A slow or failing listener would block the commit or cause deadlocks — exactly the "HA lock errors" reported in [Discussion #34142](https://github.com/apache/airflow/discussions/34142). The pluggy listener system runs outside the transaction boundary. **Q: Why not just emit to a standard Python `logging.Logger`?** A: A Python log record is an unstructured string. The `Log` model object carries structured fields (`event`, `dag_id`, `task_id`, `owner`, `extra`, `dttm`) that consumers need for filtering, routing, and compliance reporting. **Q: Why not also fire for task state changes and scheduler events?** A: Those are high-frequency system-generated events with dedicated hooks already. This PR scopes to user-initiated actions — the events that answer "who changed what and when" for compliance purposes. **Q: Who would consume this?** A: CloudWatch, Datadog, Splunk, Elasticsearch/OpenSearch, Azure Monitor, GCP Cloud Logging, OpenTelemetry collectors, custom compliance webhooks, Kafka-based audit pipelines. **Q: Is this a breaking change?** A: No. Purely additive. Existing deployments are unaffected unless they register a listener for the new hook. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
