tirkarthi opened a new issue, #59965:
URL: https://github.com/apache/airflow/issues/59965

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   The audit log page has been very slow in our environment. There are a lot of 
generated dags with same task_id present in different dags. Upon loading the 
events for a dag we noted that join for task instance is made on 
TaskInstance.task_id which results in incorrect join where events related to 
other task instances of same id are also returned. The query generated is as 
below where `LEFT OUTER JOIN (task_instance` based on task_id seems to be 
problematic. The default behavior should be fixed. The query is also loading 
everything though dag_id and task_display_name are the only required fields 
which can be improved.
   
   
https://github.com/apache/airflow/blob/98d59763242fac3fe79561967f226bfb3cfacfdf/airflow-core/src/airflow/models/log.py#L60-L66
   
   ```sql
   SELECT log.id, log.dttm, log.dag_id, log.task_id, log.map_index, log.event, 
log.logical_date, log.run_id, 
   log.owner, log.owner_display_name, log.extra, log.try_number, 
dag_1.dag_display_name, dag_1.deadline, 
   dag_1.dag_id AS dag_id_1, dag_1.is_paused, dag_1.is_stale, 
dag_1.last_parsed_time, 
   dag_1.last_parse_duration, dag_1.last_expired, dag_1.fileloc, 
dag_1.relative_fileloc, dag_1.bundle_name, dag_1.bundle_version, dag_1.owners, 
dag_1.description, dag_1.timetable_summary, dag_1.timetable_description, 
dag_1.asset_expression, dag_1.max_active_tasks, dag_1.max_active_runs, 
dag_1.max_consecutive_failed_dag_runs, dag_1.has_task_concurrency_limits, 
dag_1.has_import_errors, dag_1.fail_fast, dag_1.next_dagrun, 
dag_1.next_dagrun_data_interval_start, dag_1.next_dagrun_data_interval_end, 
dag_1.next_dagrun_create_after, task_instance_1.rendered_map_index, 
task_instance_1.task_display_name, dag_run_1.state, dag_run_1.id AS id_1, 
dag_run_1.dag_id AS dag_id_2, dag_run_1.queued_at, dag_run_1.logical_date AS 
logical_date_1, dag_run_1.start_date, dag_run_1.end_date, dag_run_1.run_id AS 
run_id_1, dag_run_1.creating_job_id, dag_run_1.run_type, 
dag_run_1.triggered_by, dag_run_1.triggering_user_name, dag_run_1.conf, 
dag_run_1.data_interval_start, dag_run_1.data_interval_end, dag_run_1.run
 _after, dag_run_1.last_scheduling_decision, dag_run_1.log_template_id, 
dag_run_1.updated_at, dag_run_1.clear_number, dag_run_1.backfill_id, 
dag_run_1.bundle_version AS bundle_version_1, dag_run_1.scheduled_by_job_id, 
dag_run_1.context_carrier, dag_run_1.span_status, 
dag_run_1.created_dag_version_id, dag_run_1.partition_key, task_instance_1.id 
AS id_2, task_instance_1.task_id AS task_id_1, task_instance_1.dag_id AS 
dag_id_3, task_instance_1.run_id AS run_id_2, task_instance_1.map_index AS 
map_index_1, task_instance_1.start_date AS start_date_1, 
task_instance_1.end_date AS end_date_1, task_instance_1.duration, 
task_instance_1.state AS state_1, task_instance_1.try_number AS try_number_1, 
task_instance_1.max_tries, task_instance_1.hostname, task_instance_1.unixname, 
task_instance_1.pool, task_instance_1.pool_slots, task_instance_1.queue, 
task_instance_1.priority_weight, task_instance_1.operator, 
task_instance_1.custom_operator_name, task_instance_1.queued_dttm, 
task_instance_1.scheduled
 _dttm, task_instance_1.queued_by_job_id, task_instance_1.last_heartbeat_at, 
task_instance_1.pid, task_instance_1.executor, task_instance_1.executor_config, 
task_instance_1.updated_at AS updated_at_1, task_instance_1.context_carrier AS 
context_carrier_1, task_instance_1.span_status AS span_status_1, 
task_instance_1.external_executor_id, task_instance_1.trigger_id, 
task_instance_1.trigger_timeout, task_instance_1.next_method, 
task_instance_1.next_kwargs, task_instance_1.dag_version_id 
   FROM log LEFT OUTER JOIN dag AS dag_1 ON log.dag_id = dag_1.dag_id LEFT 
OUTER JOIN (task_instance AS task_instance_1 JOIN dag_run AS dag_run_1 ON 
dag_run_1.dag_id = task_instance_1.dag_id AND dag_run_1.run_id = 
task_instance_1.run_id) ON log.task_id = task_instance_1.task_id 
   WHERE log.dag_id = ? AND log.task_id = ? AND log.run_id = ? ORDER BY 
log.dttm DESC, log.id DESC
    LIMIT ? OFFSET ?
   
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Parse and let the below dags complete 3 runs each.
   2. Go to audit log tab and check the events.
   3. Mark the task instance as failed and reload the page.
   
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.decorators import task
   
   
   with DAG(
       dag_id="log_query_1",
       start_date=datetime(2025, 1, 1),
       end_date=datetime(2025, 1, 3),
       catchup=True,
       schedule="@daily",
   ) as dag:
   
       @task
       def start():
           import time
           time.sleep(1)
   
       start()
   
   
   with DAG(
       dag_id="log_query_2",
       start_date=datetime(2025, 1, 1),
       end_date=datetime(2025, 1, 3),
       catchup=True,
       schedule="@daily",
   ) as dag:
   
       @task
       def start():
           import time
           time.sleep(1)
   
       start()
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to