FredericoCoelhoNunes opened a new issue #14604:
URL: https://github.com/apache/airflow/issues/14604
<!--
Welcome to Apache Airflow! For a smooth issue process, try to answer the
following questions.
Don't worry if they're not all applicable; just try to include what you can
:-)
If you need to include code snippets or logs, please put them in fenced code
blocks. If they're super-long, please use the details tag like
<details><summary>super-long log</summary> lots of stuff </details>
Please delete these comment blocks before submitting the issue.
-->
<!--
IMPORTANT!!!
PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE
NEXT TO "SUBMIT NEW ISSUE" BUTTON!!!
PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!!
Please complete the next sections or the issue will be closed.
These questions are the first thing we need to know to understand the
context.
-->
**Apache Airflow version**: 2.0.1
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
**Environment**:
- **Cloud provider or hardware configuration**:
- **OS** (e.g. from /etc/os-release): CentOS Linux 7 (Core)
- **Kernel** (e.g. `uname -a`): Linux rstudio006-18-w6fn7
3.10.0-1160.11.1.el7.x86_64 #1 SMP Mon Nov 30 13:05:31 EST 2020 x86_64 x86_64
x86_64 GNU/Linux
- **Install tools**: pip
- **Others**:
**What happened**:
I have several failed DAG runs, for different execution_dates. The
max_active_runs_per_dagparameter is 1, and using the LocalExecutor.
I used the Airflow API to delete one of these runs, and immediately after
that, I triggered it again (same execution date).
However, over the course of a few minutes, all my other failed runs were
moved to the "Running" state, simultaneously (both disregarding the
max_active_runs_per_dag parameters, and the fact that I did not run more than
one trigger command).
**What you expected to happen**:
I expected a single DAG run to be deleted, and a single DAG run to be
created.
**How to reproduce it**:
Install Airflow and configure a Postgres database (airflow_db, user airflow,
password airflow). Create any DAG with a @weekly schedule and a start_date a
few years ago. Run the DAG for all the execution dates and make sure they all
fail.
Then run the following Python script to delete and run the failed DAG runs
one by one. Check the UI and see how several runs are triggered in cascade,
before the 1st run is completed:
```
"""
Clears and re-runs all failed instances of a DAG.
"""
import psycopg2
import requests
import time
import json
AUTH = (<username>, <your password>)
DAG_ID = <Your DAG ID>
# Getting the failed DAG runs
conn = psycopg2.connect(dbname='airflow_db', user='airflow_user',
password='airflow_user')
cursor = conn.cursor()
try:
cursor.execute(f"""
select run_id, execution_date
from dag_run
where dag_id = '{DAG_ID}'
and state = 'failed'
""")
runs = [
{
'run_id': r[0],
'execution_date': r[1].isoformat()
}
for r in cursor.fetchall()
]
finally:
cursor.close()
conn.close()
print(f"Found {len(runs)} failed DAG runs")
# Iterating
for run in runs:
### Delete the run.
r = requests.delete(
f"http://localhost:1111/api/v1/dags/profile-history/dagRuns/{run['run_id']}",
auth=AUTH
)
assert r.status_code == 204
print(f"Successfully deleted {run['run_id']}")
### Execute it again
payload = {
"execution_date": run['execution_date']
}
headers = {
"Content-type": "application/json",
"Accept": "application/json"
}
r = requests.post(
f"http://localhost:1111/api/v1/dags/profile-history/dagRuns",
data=json.dumps(payload),
headers=headers,
auth=AUTH
)
assert r.status_code == 200
print(f"Successfully triggered {run['run_id']}")
### Check on a loop for state. Even before leaving this loop, more run
are being triggered.
state = ''
failed_again = False
while state != 'success':
time.sleep(5)
params = {
'execution_date_gte': run['execution_date'],
'execution_date_lte': run['execution_date']
}
r = requests.get(
'http://localhost:1111/api/v1/dags/profile-history/dagRuns',
params=params,
auth=AUTH
)
assert r.status_code == 200
r_json = r.json()
state = r.json()['dag_runs'][0]['state']
print(f"State of {run['run_id']}: {state}")
if state == 'failed':
failed_again = True
break
if failed_again:
print(f"{run['run_id']} failed again. Moving to next one")
else:
print(f"{run['run_id']} ran successfully")
```
**Anything else we need to know**:
Problem occurs every time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]