tirkarthi opened a new issue, #59965: URL: https://github.com/apache/airflow/issues/59965
### Apache Airflow version main (development) ### If "Other Airflow 3 version" selected, which one? _No response_ ### What happened? The audit log page has been very slow in our environment. There are a lot of generated dags with same task_id present in different dags. Upon loading the events for a dag we noted that join for task instance is made on TaskInstance.task_id which results in incorrect join where events related to other task instances of same id are also returned. The query generated is as below where `LEFT OUTER JOIN (task_instance` based on task_id seems to be problematic. The default behavior should be fixed. The query is also loading everything though dag_id and task_display_name are the only required fields which can be improved. https://github.com/apache/airflow/blob/98d59763242fac3fe79561967f226bfb3cfacfdf/airflow-core/src/airflow/models/log.py#L60-L66 ```sql SELECT log.id, log.dttm, log.dag_id, log.task_id, log.map_index, log.event, log.logical_date, log.run_id, log.owner, log.owner_display_name, log.extra, log.try_number, dag_1.dag_display_name, dag_1.deadline, dag_1.dag_id AS dag_id_1, dag_1.is_paused, dag_1.is_stale, dag_1.last_parsed_time, dag_1.last_parse_duration, dag_1.last_expired, dag_1.fileloc, dag_1.relative_fileloc, dag_1.bundle_name, dag_1.bundle_version, dag_1.owners, dag_1.description, dag_1.timetable_summary, dag_1.timetable_description, dag_1.asset_expression, dag_1.max_active_tasks, dag_1.max_active_runs, dag_1.max_consecutive_failed_dag_runs, dag_1.has_task_concurrency_limits, dag_1.has_import_errors, dag_1.fail_fast, dag_1.next_dagrun, dag_1.next_dagrun_data_interval_start, dag_1.next_dagrun_data_interval_end, dag_1.next_dagrun_create_after, task_instance_1.rendered_map_index, task_instance_1.task_display_name, dag_run_1.state, dag_run_1.id AS id_1, dag_run_1.dag_id AS dag_id_2, dag_run_1.queued_at, dag_run_1.logical_date AS logical_date_1, dag_run_1.start_date, dag_run_1.end_date, dag_run_1.run_id AS run_id_1, dag_run_1.creating_job_id, dag_run_1.run_type, dag_run_1.triggered_by, dag_run_1.triggering_user_name, dag_run_1.conf, dag_run_1.data_interval_start, dag_run_1.data_interval_end, dag_run_1.run _after, dag_run_1.last_scheduling_decision, dag_run_1.log_template_id, dag_run_1.updated_at, dag_run_1.clear_number, dag_run_1.backfill_id, dag_run_1.bundle_version AS bundle_version_1, dag_run_1.scheduled_by_job_id, dag_run_1.context_carrier, dag_run_1.span_status, dag_run_1.created_dag_version_id, dag_run_1.partition_key, task_instance_1.id AS id_2, task_instance_1.task_id AS task_id_1, task_instance_1.dag_id AS dag_id_3, task_instance_1.run_id AS run_id_2, task_instance_1.map_index AS map_index_1, task_instance_1.start_date AS start_date_1, task_instance_1.end_date AS end_date_1, task_instance_1.duration, task_instance_1.state AS state_1, task_instance_1.try_number AS try_number_1, task_instance_1.max_tries, task_instance_1.hostname, task_instance_1.unixname, task_instance_1.pool, task_instance_1.pool_slots, task_instance_1.queue, task_instance_1.priority_weight, task_instance_1.operator, task_instance_1.custom_operator_name, task_instance_1.queued_dttm, task_instance_1.scheduled _dttm, task_instance_1.queued_by_job_id, task_instance_1.last_heartbeat_at, task_instance_1.pid, task_instance_1.executor, task_instance_1.executor_config, task_instance_1.updated_at AS updated_at_1, task_instance_1.context_carrier AS context_carrier_1, task_instance_1.span_status AS span_status_1, task_instance_1.external_executor_id, task_instance_1.trigger_id, task_instance_1.trigger_timeout, task_instance_1.next_method, task_instance_1.next_kwargs, task_instance_1.dag_version_id FROM log LEFT OUTER JOIN dag AS dag_1 ON log.dag_id = dag_1.dag_id LEFT OUTER JOIN (task_instance AS task_instance_1 JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance_1.dag_id AND dag_run_1.run_id = task_instance_1.run_id) ON log.task_id = task_instance_1.task_id WHERE log.dag_id = ? AND log.task_id = ? AND log.run_id = ? ORDER BY log.dttm DESC, log.id DESC LIMIT ? OFFSET ? ``` ### What you think should happen instead? _No response_ ### How to reproduce 1. Parse and let the below dags complete 3 runs each. 2. Go to audit log tab and check the events. 3. Mark the task instance as failed and reload the page. ```python from datetime import datetime, timedelta from airflow import DAG from airflow.decorators import task with DAG( dag_id="log_query_1", start_date=datetime(2025, 1, 1), end_date=datetime(2025, 1, 3), catchup=True, schedule="@daily", ) as dag: @task def start(): import time time.sleep(1) start() with DAG( dag_id="log_query_2", start_date=datetime(2025, 1, 1), end_date=datetime(2025, 1, 3), catchup=True, schedule="@daily", ) as dag: @task def start(): import time time.sleep(1) start() ``` ### Operating System Ubuntu 20.04 ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
