Lee-W commented on code in PR #40478:
URL: https://github.com/apache/airflow/pull/40478#discussion_r1670189170


##########
airflow/models/dag.py:
##########
@@ -3353,6 +3367,38 @@ def bulk_write_to_db(
         del new_datasets
         del all_datasets
 
+        # store dataset aliases

Review Comment:
   do you mean 
   
   ```python
   # store datasets
   stored_datasets: dict[str, DatasetModel] = {}
   new_datasets: list[DatasetModel] = []
   for dataset in all_datasets:
       stored_dataset = session.scalar(
           select(DatasetModel).where(DatasetModel.uri == dataset.uri).limit(1)
       )
       if stored_dataset:
           # Some datasets may have been previously unreferenced, and therefore 
orphaned by the
           # scheduler. But if we're here, then we have found that dataset 
again in our DAGs, which
           # means that it is no longer an orphan, so set is_orphaned to False.
           stored_dataset.is_orphaned = expression.false()
           stored_datasets[stored_dataset.uri] = stored_dataset
       else:
           new_datasets.append(dataset)
   dataset_manager.create_datasets(dataset_models=new_datasets, session=session)
   stored_datasets.update({dataset.uri: dataset for dataset in new_datasets})
   
   del new_datasets
   del all_datasets
   ```
   
   ```python
   # store dataset aliases
   new_dataset_alias_models: list[DatasetAliasModel] = []
   if outlet_dataset_alias_models:
       outlet_dataset_alias_names = [dataset_alias.name for dataset_alias in 
outlet_dataset_alias_models]
   
       stored_dataset_alias_names = session.scalars(
           
select(DatasetAliasModel.name).where(DatasetAliasModel.name.in_(outlet_dataset_alias_names))
       ).fetchall()
       removed_dataset_alias_names = session.scalars(
           select(DatasetAliasModel.name).where(
               DatasetAliasModel.name.not_in(outlet_dataset_alias_names)
           )
       ).fetchall()
   
       if stored_dataset_alias_names:
           new_dataset_alias_models = [
               dataset_alias_model
               for dataset_alias_model in outlet_dataset_alias_models
               if dataset_alias_model.name not in stored_dataset_alias_names
           ]
       else:
           new_dataset_alias_models = outlet_dataset_alias_models
       session.add_all(new_dataset_alias_models)
   
       if removed_dataset_alias_names:
           session.execute(
               
delete(DatasetAliasModel).where(DatasetAliasModel.name.in_(removed_dataset_alias_names))
           )
   
   del new_dataset_alias_models
   del outlet_dataset_alias_models
   ```
   ?
   
   I think they're doing things a bit different.
   While `DatasetModel` is used for creating and updating, `DatasetAliasModel` 
is used for creating and deleting. The `stored_datasets` are still necessary 
for the subsequent operations, whereas the `stored_dataset_alias_names` are 
not. We could potentially extract some operations, but I feel that we may end 
up with a bunch of if-else blocks and complicate the logic. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to