michaelmicheal opened a new issue, #29162:
URL: https://github.com/apache/airflow/issues/29162
### Description
To increase the extensibility and integration capabilities of Datasets, I
think we should
1. Add a `register_external_dataset_change(self, dataset: Dataset,
extra=None, session: Session)` method in DatasetManager. This would allow for
the updating a dataset without a task_instance, which is necessary for updating
datasets across Airflow instances.
```python
def register_external_dataset_change(
self, *, dataset: Dataset, session: Session, extra=extra, **kwargs
) -> None:
"""
For local datasets, look them up, record the dataset event, queue
dagruns, and broadcast
the dataset event
"""
dataset_model = session.query(DatasetModel).filter(DatasetModel.uri
== dataset.uri).one_or_none()
if not dataset_model:
self.log.info("DatasetModel %s not found", dataset_model)
return
session.add(
DatasetEvent(
dataset_id=dataset_model.id,
extra=extra,
)
)
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
```
Alternatively, task_instance could be an optional parameter.
2. Add `update_dataset` endpoint. This endpoint would call the
`register_external_dataset_change` method and register a dataset change without
a `task_instance` or `dag_id`
3. Formalize the concept of an "external" dataset update and possibly even
add a parameter in the Dataset definition to indicate whether or it should
accept external dataset updates. This would allow for the external triggering
nature of a particular Dataset to be displayed in the Datasets graph in the UI.
### Use case/motivation
This year we moved to a multi-instance Airflow architecture, where we deploy
multiple instances of Airflow in production. With Datasets, each instance of
Airflow has it's own set of datasets, but we need to manage dataset
dependencies across instances.
We've taken advantage of the great extensibility of the configurable
DatasetManager (kudos to whoever designed that) by overriding the
`register_dataset_change` method to broadcast the DatasetEvent to each instance.
```python
class CustomDatasetManager(DatasetManager):
def register_dataset_change(self, *, task_instance: TaskInstance,
dataset: Dataset, extra=None, session: Session, **kwargs) -> None:
# Record the dataset event and trigger DAGs in the local instance
super().register_dataset_change(task_instance=task_instance,
dataset=dataset, extra=extra, session=session, **kwargs)
# Send a request to the other instances to trigger DAGs that depend
on the dataset
for instance_url in instance_urls:
url = f"{instance_url}/api/v1/shopify/admin/custom_endpoint"
# execute request
```
To make this work, we add a custom endpoint for registering a dataset change
`register_external_dataset_change`. Since a separate Airflow instance doesn't
know about the DAG or task_instance outlet that triggered the updating of the
dataset, our endpoint calls a custom external change method that we added to
our custom DatasetManager
This works because DatasetEvent has the dag and task_instance related info
nullable.
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]