Lee-W commented on code in PR #40478:
URL: https://github.com/apache/airflow/pull/40478#discussion_r1670189170
##########
airflow/models/dag.py:
##########
@@ -3353,6 +3367,38 @@ def bulk_write_to_db(
del new_datasets
del all_datasets
+ # store dataset aliases
Review Comment:
do you mean
```python
# store datasets
stored_datasets: dict[str, DatasetModel] = {}
new_datasets: list[DatasetModel] = []
for dataset in all_datasets:
stored_dataset = session.scalar(
select(DatasetModel).where(DatasetModel.uri == dataset.uri).limit(1)
)
if stored_dataset:
# Some datasets may have been previously unreferenced, and therefore
orphaned by the
# scheduler. But if we're here, then we have found that dataset
again in our DAGs, which
# means that it is no longer an orphan, so set is_orphaned to False.
stored_dataset.is_orphaned = expression.false()
stored_datasets[stored_dataset.uri] = stored_dataset
else:
new_datasets.append(dataset)
dataset_manager.create_datasets(dataset_models=new_datasets, session=session)
stored_datasets.update({dataset.uri: dataset for dataset in new_datasets})
del new_datasets
del all_datasets
```
```python
# store dataset aliases
new_dataset_alias_models: list[DatasetAliasModel] = []
if outlet_dataset_alias_models:
outlet_dataset_alias_names = [dataset_alias.name for dataset_alias in
outlet_dataset_alias_models]
stored_dataset_alias_names = session.scalars(
select(DatasetAliasModel.name).where(DatasetAliasModel.name.in_(outlet_dataset_alias_names))
).fetchall()
removed_dataset_alias_names = session.scalars(
select(DatasetAliasModel.name).where(
DatasetAliasModel.name.not_in(outlet_dataset_alias_names)
)
).fetchall()
if stored_dataset_alias_names:
new_dataset_alias_models = [
dataset_alias_model
for dataset_alias_model in outlet_dataset_alias_models
if dataset_alias_model.name not in stored_dataset_alias_names
]
else:
new_dataset_alias_models = outlet_dataset_alias_models
session.add_all(new_dataset_alias_models)
if removed_dataset_alias_names:
session.execute(
delete(DatasetAliasModel).where(DatasetAliasModel.name.in_(removed_dataset_alias_names))
)
del new_dataset_alias_models
del outlet_dataset_alias_models
```
?
I think they're doing things a bit different.
While `DatasetModel` is used for creating and updating, `DatasetAliasModel`
is used for creating and deleting. The `stored_datasets` are still necessary
for the subsequent operations, whereas the `stored_dataset_alias_names` are
not. We could potentially extract some operations, but I feel that we may end
up with a bunch of if-else blocks and complicate the logic. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]