mrn-aglic opened a new issue, #34044:
URL: https://github.com/apache/airflow/issues/34044
### Description
Airflow should support defining different types of dataset consumer DAGs for
dataset events. For example, if a DAG producer has created multiple dataset
events, the DAG consumer should choose whether to digest these dataset events
one by one or as a whole (for now).
The current implementation is non-deterministic. The job scheduler will pick
up the dataset events which may be stored in the database by one or more DAG
runs and will create a single DAG run. Sometimes the data intervals may
correspond between the producer and consumer, sometimes they may not.
It would be nice if different types of consumers were also supported. For
example, in our use case, we would need the consumer to replicate the data
intervals of the producer DAG. I have taken a look at the source code and can
propose a simple solution for this (on that later).
However, this also poses some issues/questions.
Let's break it down to pros and cons.
Pros:
- in our case, we mostly use data-aware scheduling to break up large
pipelines to multiple DAGs.This would allow us to be sure that data intervals
are `replicated` across the entire pipeline
Cons:
- what would it mean for situations where multiple DAGs `produce` the same
dataset? These DAGs may have different data intervals, so replicating one
doesn't make (exactly) sense. Different types of consuming modes could be
proposed for these cases.
- the `dataset_dag_run_queue` model in the database may not reflect what
exactly will happen - the actual dag runs that will be queued. **I'm not sure.**
Here are some of my findings on what it would mean to add support for a
consumer to replicate the data interval of the producer (again, this makes
sense only if a single producer is updating the dataset):
- change the DAG constructor to accept the mode for consuming dataset events
(default to `default` or name it `simple`)
- introduce an enum-like class (or whatever airflow uses for defining types)
to define different consumer modes
- add an `__init__` method to the `DatasetTimetable` class to pass the mode
for consumer
- change the implementation for fetching data intervals to something like
this:
```
def _data_interval_from_event(event: DatasetEvent) -> DataInterval:
return DataInterval(event.source_dag_run.interval_start,
event.source_dag_run.interval_end)
def _data_intervals_per_event(
self,
logical_date: DateTime,
events: Collection[DatasetEvent],
) -> Collection[DataInterval]:
if not events:
return DataInterval(logical_date, logical_date)
return [_data_interval_from_event(event) for event in events]
def _data_interval_for_event_set(
self,
logical_date: DateTime,
events: Collection[DatasetEvent],
) -> Collection[DataInterval]:
if not events:
return DataInterval(logical_date, logical_date)
start = min(
events, key=operator.attrgetter("source_dag_run.data_interval_start")
).source_dag_run.data_interval_start
end = max(
events, key=operator.attrgetter("source_dag_run.data_interval_end")
).source_dag_run.data_interval_end
return DataInterval(start, end)
def data_interval_for_events(
self,
logical_date: DateTime,
events: Collection[DatasetEvent],
) -> Collection[DataInterval]:
if self.event_mode == DatasetConsumerType.NON_DETERMINISTIC:
return _data_interval_for_event_set(logical_date, events)
elif self.event_mode == DatasetConsumerType.DATA_INTERVAL:
return _data_intervals_per_event(logical_date, events)
else: # this case should not be able to happen
raise NotImplementedError()
```
and change the scheduler job logic to iterate over a list instead of a
single value:
```
data_intervals = dag.timetable.data_interval_for_events(exec_date,
dataset_events)
for data_interval in data_intervals:
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.DATASET_TRIGGERED,
logical_date=exec_date,
data_interval=data_interval,
session=session,
events=dataset_events,
)
dag_run = dag.create_dagrun(
run_id=run_id,
run_type=DagRunType.DATASET_TRIGGERED,
execution_date=exec_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.job.id,
)
Stats.incr("dataset.triggered_dagruns")
dag_run.consumed_dataset_events.extend(dataset_events)
session.query(DatasetDagRunQueue).filter(
DatasetDagRunQueue.target_dag_id == dag_run.dag_id
).delete()
```
In the future, one could separate different types of dataset timetables.
Now, I have never submitted anything to Airflow, so I don't know the source
code, and **there may be issues with this approach that I am not aware off**.
Thoughts?
### Use case/motivation
While working with Data-aware scheduling in Airflow, there is an "issue"
where multiple dataset events trigger a single DAG run of the consumer DAG.
This presents an issue when using `data_interal_start` and `data_interval_end`
variables in DAG execution as well as jinja templated queries.
The **problem** presents itself when either the producer DAG is running
**during catchup or backfill**.
It seems like the culprit is that multiple dataset events are used to create
a single `DataInterval`, where upon creation the min and max across all of the
dataset events are taken (see `DatasetTriggeredTimetable` implementation).
It would be nice if different types of consumers were supported. For
example, in our use case, we would need the consumer to replicate the data
intervals of the producer DAG. I have taken a look at the source code and can
propose a simple solution for this (on that later).
This would allow breaking up a complex pipeline into multiple DAGs and use
backfill or catchup with more certainty in the end result.
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]