hugochinchilla commented on PR #28333:
URL: https://github.com/apache/airflow/pull/28333#issuecomment-1357534240
I don't have time for implementing anything more complex than this. I had a
need and this was all I needed to make it work, as I see this is gonna take too
much time and effort to put into a release I've found a simple workaround that
works for me.
If anybody needs to be able to change the default behavior so that all dags
are run whenever any related dataset is updated, just use this dag:
```python
@dag(schedule=timedelta(minutes=1), max_active_runs=1,
start_date=datetime(2022, 1, 1), catchup=False)
def schedule_dags_on_datasets_updated():
@task
def find_updated_datasets():
from airflow.models.dataset import DagScheduleDatasetReference,
DatasetDagRunQueue as DDRQ
from airflow.utils.session import create_session
from sqlalchemy import insert, and_
from sqlalchemy.sql.functions import now
with create_session() as session:
select = (
session.query(
DagScheduleDatasetReference.dataset_id,
DagScheduleDatasetReference.dag_id,
now(),
)
.join(
DDRQ,
and_(
DDRQ.dataset_id ==
DagScheduleDatasetReference.dataset_id,
DDRQ.target_dag_id ==
DagScheduleDatasetReference.dag_id,
),
isouter=True,
)
.filter(DDRQ.target_dag_id.is_(None))
)
insert_stmt = insert(DDRQ).from_select([
DDRQ.dataset_id,
DDRQ.target_dag_id,
DDRQ.created_at
], select)
session.execute(insert_stmt)
find_updated_datasets()
schedule_dags_on_datasets_updated()
```
This will fill the DatasetDagRunQueue with fake events for each missing
event to make the scheduler run every time a single dataset has been updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]