yuqian90 commented on pull request #14048:
URL: https://github.com/apache/airflow/pull/14048#issuecomment-775124153


   Hi @kaxil this is the experiment I did. 
`test_create_large_dag_with_task_reschedule` creates a log of TaskInstance and 
`TaskReschedule` in the db. Then `test_clear_large_dag_with_task_reschedule` 
times how long it takes to call `clear_task_instances` on them.
   
   ```python
   import pytest
   
   @pytest.fixture(scope="module")
   def big_dag():
       with DAG(
           'test_create_large_dag_with_task_reschedule',
           start_date=DEFAULT_DATE,
           end_date=DEFAULT_DATE + datetime.timedelta(days=30),
       ) as dag:
           for i in range(1000):
               PythonSensor(task_id=f'task_{i}', python_callable=lambda: False, 
mode="reschedule")
   
       yield dag
   
   import copy
   
   def test_create_large_dag_with_task_reschedule(big_dag):
   
       tis = []
   
       for i in range(10):
           execution_date = DEFAULT_DATE + datetime.timedelta(days=i)
           big_dag.create_dagrun(
               execution_date=execution_date,
               state=State.RUNNING,
               run_type=DagRunType.SCHEDULED,
           )
           for task in big_dag.tasks:
               ti = TI(task=copy.copy(task), execution_date=execution_date)
               tis.append(ti)
   
       import pendulum
   
       tss = []
       for ti in tis:
           for i in range(5):
               tss.append(TaskReschedule(
                   task=ti.task,
                   execution_date=ti.execution_date,
                   try_number=1,
                   start_date=pendulum.now(),
                   end_date=pendulum.now(),
                   reschedule_date=pendulum.now()))
   
       def chunks(lst, n):
           """Yield successive n-sized chunks from lst."""
           for i in range(0, len(lst), n):
               yield lst[i:i + n]
   
   
       with create_session() as session:
           for chunk in chunks([{"task_id": tr.task_id, "dag_id": tr.dag_id, 
"execution_date": tr.execution_date,
                                 "try_number": tr.try_number, "start_date": 
tr.start_date, "end_date": tr.end_date,
                                 "duration": tr.duration, "reschedule_date": 
tr.reschedule_date} for tr in tss], 10000):
               session.execute(TaskReschedule.__table__.insert(), chunk)
               session.commit()
   
   def test_clear_large_dag_with_task_reschedule(big_dag):
       import time
   
       with create_session() as session:
           def count_task_reschedule():
               return (
                   session.query(TaskReschedule)
                   .filter(
                       TaskReschedule.dag_id == big_dag.dag_id,
                       TaskReschedule.try_number == 1,
                   )
                   .count()
               )
   
           assert count_task_reschedule() == 1000 * 10 * 5
           qry = session.query(TI).filter(TI.dag_id == big_dag.dag_id).all()
           start = time.time()
           clear_task_instances(qry, session, dag=big_dag)
           end = time.time()
           print(f"clear_task_instances took {end - start}")
           assert count_task_reschedule() == 0
           session.rollback()
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to