michaelmicheal opened a new issue, #29162:
URL: https://github.com/apache/airflow/issues/29162

   ### Description
   
   To increase the extensibility and integration capabilities of Datasets, I 
think we should
   
   1. Add a `register_external_dataset_change(self, dataset: Dataset, 
extra=None, session: Session)` method in DatasetManager. This would allow for 
the updating a dataset without a task_instance, which is necessary for updating 
datasets across Airflow instances.
   ```python
   def register_external_dataset_change(
           self, *, dataset: Dataset, session: Session, extra=extra, **kwargs
       ) -> None:
           """
           For local datasets, look them up, record the dataset event, queue 
dagruns, and broadcast
           the dataset event
           """
           dataset_model = session.query(DatasetModel).filter(DatasetModel.uri 
== dataset.uri).one_or_none()
           if not dataset_model:
               self.log.info("DatasetModel %s not found", dataset_model)
               return
           session.add(
               DatasetEvent(
                   dataset_id=dataset_model.id,
                   extra=extra,
               )
           )
           if dataset_model.consuming_dags:
               self._queue_dagruns(dataset_model, session)
           session.flush()
   ```
   Alternatively, task_instance could be an optional parameter.
   2. Add `update_dataset` endpoint. This endpoint would call the 
`register_external_dataset_change` method and register a dataset change without 
a `task_instance` or `dag_id`
   3. Formalize the concept of an "external" dataset update and possibly even 
add a parameter in the Dataset definition to indicate whether or it should 
accept external dataset updates. This would allow for the external triggering 
nature of a particular Dataset to be displayed in the Datasets graph in the UI.
   
   ### Use case/motivation
   
   This year we moved to a multi-instance Airflow architecture, where we deploy 
multiple instances of Airflow in production. With Datasets, each instance of 
Airflow has it's own set of datasets, but we need to manage dataset 
dependencies across instances. 
   
   We've taken advantage of the great extensibility of the configurable 
DatasetManager (kudos to whoever designed that) by overriding the 
`register_dataset_change` method to broadcast the DatasetEvent to each instance.
   
   ```python 
   class CustomDatasetManager(DatasetManager):
       def register_dataset_change(self, *, task_instance: TaskInstance, 
dataset: Dataset, extra=None, session: Session, **kwargs) -> None:
           # Record the dataset event and trigger DAGs in the local instance
           super().register_dataset_change(task_instance=task_instance, 
dataset=dataset, extra=extra, session=session, **kwargs)
   
           # Send a request to the other instances to trigger DAGs that depend 
on the dataset
           for instance_url in instance_urls:
               url = f"{instance_url}/api/v1/shopify/admin/custom_endpoint"
               # execute request
   ```
   To make this work, we add a custom endpoint for registering a dataset change 
`register_external_dataset_change`. Since a separate Airflow instance doesn't 
know about the DAG or task_instance outlet that triggered the updating of the 
dataset, our endpoint calls a custom external change method that we added to 
our custom DatasetManager
   
   This works because DatasetEvent has the dag and task_instance related info 
nullable.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to