fuechter opened a new issue, #30636:
URL: https://github.com/apache/airflow/issues/30636
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Version Airflow: 2.5.1
I have a file in python that generate dynamic DAG, and all time when have a
new code in that file, is necessary to execute `./airflow.sh dags reserialize`,
but sometimes this command return with an error like below, but :
```
[2023-04-13 19:46:54,669] {dag.py:2690} INFO - Sync 35 DAGs
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line
39, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
line 52, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line
108, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/dag_command.py",
line 492, in dag_reserialize
dagbag.sync_to_db(session=session)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py",
line 645, in sync_to_db
for attempt in run_with_db_retries(logger=self.log):
File
"/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line
384, in __iter__
do = self.iter(retry_state=retry_state)
File
"/home/airflow/.local/lib/python3.7/site-packages/tenacity/__init__.py", line
351, in iter
return fut.result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 428, in
result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py",
line 660, in sync_to_db
self.dags.values(), processor_subdir=processor_subdir, session=session
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dag.py", line
2701, in bulk_write_to_db
orm_dags: list[DagModel] = with_row_locks(query, of=DagModel,
session=session).all()
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py",
line 2773, in all
return self._iter().all()
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py",
line 2919, in _iter
execution_options={"_sa_orm_load_options": self.load_options},
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1713, in execute
conn = self._connection_for_bind(bind)
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 1553, in _connection_for_bind 2-tetris-linkedin-hourly-days',
'fileloc': '/opt/airflow/dags/register_dynamic_dag.py', 'fileloc_hash':
51105027638417678, 'data': '{"__version": 1, "dag": {"_dag_id":
"96ddcc3b-900a-44a7-bda9-81b9eeserialized DAG:
/opt/airflow/dags/register_alerts_resport_dag.py
engine, execution_options
python3.7/site-packages/airflow/models/dagbag.py", line 631, in
_serialize_dag_capturing_errors
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 721, in _connection_for_bind
self._assert_active()
File
"/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py",
line 608, in _assert_active
code="7s2a",
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates
unique constraint "serialized_dag_pkey"
DETAIL: Key
(dag_id)=(96ddcc3b-900a-44a7-bda9-81b9eefde4d2-dynamic-dag-hourly-days) already
exists.
```
### What you think should happen instead
I think that airflow should only reserialize without problem with duplicate
key.
### How to reproduce
The python's file that register dynamic dag (I shortened code to focus on
the problem):
Here, the data is fixed as dict on python, but in real scene is a json file,
that update sometime for other DAG.
```
import pytz
from datetime import datetime, timedelta
from airflow.decorators import dag, task
data = {
'companies': [{
'id': '1233',
'slug': 'company-1',
'timezone': {
'id': 'America/Sao_Paulo'
},
'connections': [{
'id': 'facebook',
}, {
'id': 'instagram',
}]
}, {
'id': '1234',
'slug': 'company-2',
'timezone': {
'id': 'America/Sao_Paulo'
},
'connections': [{
'id': 'facebook',
}, {
'id': 'instagram',
}, {
'id': 'twitter',
}]
}]
}
def create_dynamic_dag(dag_id, company):
schedule_interval = "* 5 * * * *"
local_tz = pytz.timezone(company['timezone']['id'])
start_date = datetime.now(local_tz) - timedelta(days=7, hours=0)
@dag(
dag_id=dag_id,
schedule=schedule_interval,
start_date=start_date,
catchup=False,
max_active_runs=1,
)
def dynamic_dag():
@task.branch
def ping():
# ...code to ping
# return 'done' or 'notify_error'
pass
@task
def notify_error():
# ...code to notify error
pass
@task
def done():
# ... code to done
pass
ping_instance = ping()
notify_error_instance = notify_error()
done_instance = done()
ping_instance >> [done_instance, notify_error_instance]
dagD = dynamic_dag()
globals()[dag_id] = dagD
return dagD
for company in data['companies']:
for connection in company['connections']:
dag_id = f"{company['id']}-{company['slug']}-{connection['id']}"
dagD = create_dynamic_dag(dag_id, company)
```
### Operating System
Mac, but I'm using docker from airflow
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
I only follow the documentation of [Airflow in
docker](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html).
### Anything else
Sometime this errors doesn't happen, but it's happens with more frequency.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]