Hi Ash and James, This is an exciting move. What do you think about using this opportunity to extend Airflow's support to streaming like use cases? I.e. DAGs/tasks that want to run forever like a service. For such use cases, schedule interval might not be meaningful, then do we want to make the date interval param optional to DagRun and task instances? That sounds like a pretty major change to the underlying model of Airflow, but this AIP is so far the best opportunity I saw that can level up Airflow's support for streaming/service use cases.
Cheers, Kevin Y On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstand...@gmail.com> wrote: > Very excited to see this proposal come through and love the direction this > has gone. > > Couple comments... > > *Tree view / Data completeness view* > > When you design your tasks with the canonical idempotence pattern, the > tree view shows you both data completeness and task execution history > (success / failure etc). > > When you don't use that pattern (which is my general preference), tree > view is only task execution history. > > This change has the potential to unlock a data completeness view for > canonical tasks. It's possible that the "data completeness view" can > simply be the tree view. I.e. somehow it can use these new classes to know > what data was successfully filled and what data wasn't. > > To the extent we like the idea of either extending / plugging / modifying > tree view, or adding a distinct data completeness view, we might want to > anticipate the needs of that in this change. And maybe no alteration to > the proposal would be needed but just want to throw the idea out there. > > *Watermark workflow / incremental processing* > > A common pattern in data warehousing is pulling data incrementally from a > source. > > A standard way to achieve this is at the start of the task, select max > `updated_at` in source table and hold on to that value for a minute. This > is your tentative new high watermark. > Now it's time to pull your data. If your task ran before, grab last high > watermark. If not, use initial load value. > If successful, update high watermark. > > On my team we implemented this with a stateful tasks / stateful processes > concept (there's a dormant draft AIP here > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) > and a WatermarkOperator that handled the boilerplate*. > > Again here, I don't have a specific suggestion at this moment. But I > wanted to articulate this workflow because it is common and it wasn't > immediately obvious to me in reading AIP-39 how I would use it to implement > it. > > AIP-39 makes airflow more data-aware. So if it can support this kind of > workflow that's great. @Ash Berlin-Taylor <a...@astronomer.io> do you > have thoughts on how it might be compatible with this kind of thing as is? > > --- > > * The base operator is designed so that Subclasses only need to implement > two methods: > - `get_high_watermark`: produce the tentative new high watermark > ' `watermark_execute`: analogous to implementing poke in a sensor, > this is where your work is done. `execute` is left to the base class, and > it orchestrates (1) getting last high watermark or inital load value and > (2) updating new high watermark if job successful. > >