Hi Ash and all,
What do people think of this? Worth it? Too complex to reason about what > context variables might exist as a result? I think I wouldn't worry about it right now or maybe not as part of this AIP. Currently, in one of the Github Issue, a user mentioned that it is not straightforward to know what is inside the context dictionary- https://github.com/apache/airflow/issues/14396. So maybe we can tackle this issue separately once the AbstractTimetable is built. Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs > "DataTimetable") have data_interval_start and end available in the context? hmm.. I would say No but then it contradicts my suggestion to remove context dict from this AIP. If we are going to use it in scheduler then yes, where data_interval_start = data_interval_end from CronTimetable. Does anyone have any better names than TimeDeltaTimetable, DataTimetable, > and CronTimetable? (We can probably change these names right up until > release, so not important to get this correct *now*.) No strong opinion here. Just an alternate suggestion can be TimeDeltaSchedule, DataSchedule and CronSchedule > Should I try to roll AIP-30 > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> > in > to this, or should we make that a future addition? (My vote is for future > addition) I would vote for Future addition too. Regards, Kaxil On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <a...@apache.org> wrote: > I think, yes, AIP-35 or something like it would happily co-exist with this > proposal. > > @Daniel <dpstand...@gmail.com> and I have been discussing this a bit on > Slack, and one of the questions he asked was if the concept of > data_interval should be moved from DagRun as James and I suggested down on > to the individual task: > > suppose i have a new dag hitting 5 api endpoints and pulling data to s3. > suppose that yesterday 4 of them succeeded but one failed. today, 4 of them > should pull from yesterday. but the one that failed should pull from 2 days > back. so even though these normally have the same interval, today they > should not. > > > My view on this is two fold: one, this should primarily be handled by > retries on the task, and secondly, having different TaskIstances in the > same DagRun have different data intervals would be much harder to reason > about/design the UI around, so for those reasons I still think interval > should be a DagRun-level concept. > > (He has a stalled AIP-30 where he proposed something to address this kind > of "watermark" case, which we might pick up next after this AIP is complete) > > One thing we might want to do is extend the interface of AbstractTimetable > to be able to add/update parameters in the context dict, so the interface > could become this: > > class AbstractTimetable(ABC): > @abstractmethod > def next_dagrun_info( > date_last_automated_dagrun: Optional[pendulum.DateTime], > > session: Session, > ) -> Optional[DagRunInfo]: > """ > Get information about the next DagRun of this dag after > ``date_last_automated_dagrun`` -- the > execution date, and the earliest it could be scheduled > > :param date_last_automated_dagrun: The max(execution_date) of > existing > "automated" DagRuns for this dag (scheduled or backfill, but > not > manual) > """ > > @abstractmethod > def set_context_variables(self, dagrun: DagRun, context: Dict[str, Any]) > -> None: > """ > Update or set new context variables to become available in task > templates and operators. > """ > > > What do people think of this? Worth it? Too complex to reason about what > context variables might exist as a result? > > *Outstanding question*: > > - Should "interval-less DAGs" (ones using "CronTimetable" in my > proposal vs "DataTimetable") have data_interval_start and end available in > the context? > - Does anyone have any better names than TimeDeltaTimetable, > DataTimetable, and CronTimetable? (We can probably change these names right > up until release, so not important to get this correct *now*.) > - Should I try to roll AIP-30 > > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> > in to this, or should we make that a future addition? (My vote is for > future addition) > > > I'd like to start voting on this AIP next week (probably on Tuesday) as I > think this will be a powerful feature that eases confusing to new users. > > -Ash > > > On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinh...@yandex.com> wrote: > > Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To > Airflow"? > I think streaming was also discussed there (though it wasn't really the > use case). > > > 02.03.2021, 22:10, "Ash Berlin-Taylor" <a...@apache.org>: > > Hi Kevin, > > Interesting idea. My original idea was actually for "interval-less DAGs" > (i.e. ones where it's just "run at this time") would not have > data_interval_start or end, but (while drafting the AIP) we decided that it > was probably "easier" if those values were always datetimes. > > That said, I think having the DB model have those values be nullable would > future proof it without needing another migration to change it. Do you > think this is worth doing now? > > I haven't (yet! It's on my list) spent any significant time thinking about > how to make Airflow play nicely with streaming jobs. If anyone else has > ideas here please share them > > -ash > > On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrql...@gmail.com> wrote: > > Hi Ash and James, > > This is an exciting move. What do you think about using this opportunity > to extend Airflow's support to streaming like use cases? I.e DAGs/tasks > that want to run forever like a service. For such use cases, schedule > interval might not be meaningful, then do we want to make the date interval > param optional to DagRun and task instances? That sounds like a pretty > major change to the underlying model of Airflow, but this AIP is so far the > best opportunity I saw that can level up Airflow's support for > streaming/service use cases. > > > Cheers, > Kevin Y > > On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstand...@gmail.com> > wrote: > > Very excited to see this proposal come through and love the direction this > has gone. > > Couple comments... > > *Tree view / Data completeness view* > > When you design your tasks with the canonical idempotence pattern, the > tree view shows you both data completeness and task execution history > (success / failure etc). > > When you don't use that pattern (which is my general preference), tree > view is only task execution history. > > This change has the potential to unlock a data completeness view for > canonical tasks. It's possible that the "data completeness view" can > simply be the tree view. I.e. somehow it can use these new classes to know > what data was successfully filled and what data wasn't. > > To the extent we like the idea of either extending / plugging / modifying > tree view, or adding a distinct data completeness view, we might want to > anticipate the needs of that in this change. And maybe no alteration to > the proposal would be needed but just want to throw the idea out there. > > *Watermark workflow / incremental processing* > > A common pattern in data warehousing is pulling data incrementally from a > source. > > A standard way to achieve this is at the start of the task, select max > `updated_at` in source table and hold on to that value for a minute. This > is your tentative new high watermark. > Now it's time to pull your data. If your task ran before, grab last high > watermark. If not, use initial load value. > If successful, update high watermark. > > On my team we implemented this with a stateful tasks / stateful processes > concept (there's a dormant draft AIP here > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) > and a WatermarkOperator that handled the boilerplate*. > > Again here, I don't have a specific suggestion at this moment. But I > wanted to articulate this workflow because it is common and it wasn't > immediately obvious to me in reading AIP-39 how I would use it to implement > it. > > AIP-39 makes airflow more data-aware. So if it can support this kind of > workflow that's great. @Ash Berlin-Taylor <a...@astronomer.io> do you > have thoughts on how it might be compatible with this kind of thing as is? > > --- > > * The base operator is designed so that Subclasses only need to implement > two methods: > - `get_high_watermark`: produce the tentative new high watermark > ' `watermark_execute`: analogous to implementing poke in a sensor, > this is where your work is done. `execute` is left to the base class, and > it orchestrates (1) getting last high watermark or inital load value and > (2) updating new high watermark if job successful. > >