dhatch-niv opened a new issue, #27509:
URL: https://github.com/apache/airflow/issues/27509
### Apache Airflow version
2.4.2
### What happened
I have a DAG that is triggered by three datasets. When I remove one or more
of these datasets, the web server fails to update the DAG, and `airflow dags
reserialize` fails with an `AssertionError` within SQLAlchemy. Full stack trace
below:
```
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 75, in wrapper
docker-airflow-scheduler-1 | return func(*args, session=session,
**kwargs)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/processor.py",
line 781, in process_file
docker-airflow-scheduler-1 |
dagbag.sync_to_db(processor_subdir=self._dag_directory, session=session)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
docker-airflow-scheduler-1 | return func(*args, **kwargs)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py",
line 644, in sync_to_db
docker-airflow-scheduler-1 | for attempt in
run_with_db_retries(logger=self.log):
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line
382, in __iter__
docker-airflow-scheduler-1 | do = self.iter(retry_state=retry_state)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/tenacity/__init__.py", line
349, in iter
docker-airflow-scheduler-1 | return fut.result()
docker-airflow-scheduler-1 | File
"/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
docker-airflow-scheduler-1 | return self.__get_result()
docker-airflow-scheduler-1 | File
"/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in
__get_result
docker-airflow-scheduler-1 | raise self._exception
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py",
line 658, in sync_to_db
docker-airflow-scheduler-1 | DAG.bulk_write_to_db(
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
docker-airflow-scheduler-1 | return func(*args, **kwargs)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line
2781, in bulk_write_to_db
docker-airflow-scheduler-1 | session.flush()
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3345, in flush
docker-airflow-scheduler-1 | self._flush(objects)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3485, in _flush
docker-airflow-scheduler-1 |
transaction.rollback(_capture_exception=True)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
docker-airflow-scheduler-1 | compat.raise_(
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
docker-airflow-scheduler-1 | raise exception
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3445, in _flush
docker-airflow-scheduler-1 | flush_context.execute()
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
docker-airflow-scheduler-1 | rec.execute(self)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 577, in execute
docker-airflow-scheduler-1 |
self.dependency_processor.process_deletes(uow, states)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py",
line 552, in process_deletes
docker-airflow-scheduler-1 | self._synchronize(
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/dependency.py",
line 610, in _synchronize
docker-airflow-scheduler-1 | sync.clear(dest, self.mapper,
self.prop.synchronize_pairs)
docker-airflow-scheduler-1 | File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/sync.py", line
86, in clear
docker-airflow-scheduler-1 | raise AssertionError(
docker-airflow-scheduler-1 | AssertionError: Dependency rule tried to
blank-out primary key column 'dataset_dag_run_queue.dataset_id' on instance
'<DatasetDagRunQueue at 0xffff5d213d00>'
```
### What you think should happen instead
The DAG does not properly load in the UI, and no error is displayed.
Instead, the old datasets that have been removed should be removed as
dependencies and the DAG should be updated with the new dataset dependencies.
### How to reproduce
Initial DAG:
```python
def foo():
pass
@dag(
dag_id="test",
start_date=pendulum.datetime(2022, 1, 1),
catchup=False,
schedule=[
Dataset('test/1'),
Dataset('test/2'),
Dataset('test/3'),
]
)
def test_dag():
@task
def test_task():
foo()
test_task()
test_dag()
```
At least one of the datasets should be 'ready'. Now `dataset_dag_run_queue`
will look something like below:
```
airflow=# SELECT * FROM dataset_dag_run_queue ;
dataset_id | target_dag_id | created_at
------------+-------------------------------------+-------------------------------
16 | test | 2022-11-02 19:47:53.938748+00
(1 row)
```
Then, update the DAG with new datasets:
```python
def foo():
pass
@dag(
dag_id="test",
start_date=pendulum.datetime(2022, 1, 1),
catchup=False,
schedule=[
Dataset('test/new/1'), # <--- updated
Dataset('test/new/2'),
Dataset('test/new/3'),
]
)
def test_dag():
@task
def test_task():
foo()
test_task()
test_dag()
```
Now you will observe the error in the web server logs or when running
`airflow dags reserialize`.
I suspect this issue is related to handling of cascading deletes on the
`dataset_id` foreign key for the run queue table. Dataset `id = 16` is one of
the datasets that has been renamed.
### Operating System
docker image - apache/airflow:2.4.2-python3.9
### Versions of Apache Airflow Providers
```
apache-airflow-providers-amazon==6.0.0
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.4.0
apache-airflow-providers-common-sql==1.2.0
apache-airflow-providers-docker==3.2.0
apache-airflow-providers-elasticsearch==4.2.1
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-google==8.4.0
apache-airflow-providers-grpc==3.0.0
apache-airflow-providers-hashicorp==3.1.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-microsoft-azure==4.3.0
apache-airflow-providers-mysql==3.2.1
apache-airflow-providers-odbc==3.1.2
apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-sendgrid==3.0.0
apache-airflow-providers-sftp==4.1.0
apache-airflow-providers-slack==6.0.0
apache-airflow-providers-sqlite==3.2.1
apache-airflow-providers-ssh==3.2.0
```
### Deployment
Docker-Compose
### Deployment details
Running using docker-compose locally.
### Anything else
To trigger this problem the dataset to be removed must be in the "ready"
state so that there is an entry in `dataset_dag_run_queue`.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]