yuqian90 edited a comment on pull request #14048:
URL: https://github.com/apache/airflow/pull/14048#issuecomment-775124153
Hi @kaxil this is the experiment I did.
`test_create_large_dag_with_task_reschedule` creates a lot of TaskInstance and
`TaskReschedule` in the db. Then `test_clear_large_dag_with_task_reschedule`
times how long it takes to call `clear_task_instances` on them.
```python
import pytest
@pytest.fixture(scope="module")
def big_dag():
with DAG(
'test_create_large_dag_with_task_reschedule',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=30),
) as dag:
for i in range(1000):
PythonSensor(task_id=f'task_{i}', python_callable=lambda: False,
mode="reschedule")
yield dag
import copy
def test_create_large_dag_with_task_reschedule(big_dag):
tis = []
for i in range(10):
execution_date = DEFAULT_DATE + datetime.timedelta(days=i)
big_dag.create_dagrun(
execution_date=execution_date,
state=State.RUNNING,
run_type=DagRunType.SCHEDULED,
)
for task in big_dag.tasks:
ti = TI(task=copy.copy(task), execution_date=execution_date)
tis.append(ti)
import pendulum
tss = []
for ti in tis:
for i in range(5):
tss.append(TaskReschedule(
task=ti.task,
execution_date=ti.execution_date,
try_number=1,
start_date=pendulum.now(),
end_date=pendulum.now(),
reschedule_date=pendulum.now()))
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
with create_session() as session:
for chunk in chunks([{"task_id": tr.task_id, "dag_id": tr.dag_id,
"execution_date": tr.execution_date,
"try_number": tr.try_number, "start_date":
tr.start_date, "end_date": tr.end_date,
"duration": tr.duration, "reschedule_date":
tr.reschedule_date} for tr in tss], 10000):
session.execute(TaskReschedule.__table__.insert(), chunk)
session.commit()
def test_clear_large_dag_with_task_reschedule(big_dag):
import time
with create_session() as session:
def count_task_reschedule():
return (
session.query(TaskReschedule)
.filter(
TaskReschedule.dag_id == big_dag.dag_id,
TaskReschedule.try_number == 1,
)
.count()
)
assert count_task_reschedule() == 1000 * 10 * 5
qry = session.query(TI).filter(TI.dag_id == big_dag.dag_id).all()
start = time.time()
clear_task_instances(qry, session, dag=big_dag)
end = time.time()
print(f"clear_task_instances took {end - start}")
assert count_task_reschedule() == 0
session.rollback()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]