Yup I have no strong opinions on either so happy to keep it TimeTable or if there is another suggestion.
Regards, Kaxil On Thu, Mar 11, 2021 at 5:00 PM James Timmins <ja...@astronomer.io.invalid> wrote: > Respectfully, I strongly disagree with the renaming of Timetable to > Schedule. Schedule and Scheduler aren't meaningfully different, which can > lead to a lot of confusion. Even as a native English speaker, and someone > who works on Airflow full time, I routinely need to ask for clarification > about what schedule-related concept someone is referring to. I foresee > Schedule and Scheduler as two distinct yet closely related concepts > becoming a major source of confusion. > > If folks dislike Timetable, we could certainly change to something else, > but let's not use something so similar to existing Airflow classes. > > -James > > On Thu, Mar 11, 2021 at 2:13 AM Ash Berlin-Taylor <a...@apache.org> wrote: > >> Summary of changes so far on the AIP: >> >> My proposed rename of DagRun.execution_date is now DagRun.schedule_date >> (previously I had proposed run_date. Thanks dstandish!) >> >> Timetable classes are renamed to Schedule classes (CronSchedule etc), >> similarly the DAG argument is now schedule (reminder: schedule_interval >> will not be removed or deprecated, and will still be the way to use >> "simple" expressions) >> >> -ash >> >> On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <a...@apache.org> wrote: >> >> Could change Timetable To Schedule -- that would mean the DAG arg becomes >> `schedule=CronSchedule(...)` -- a bit close to the current >> `schedule_interval` but I think clear enough difference. >> >> I do like the name but my one worry with "schedule" is that Scheduler and >> Schedule are very similar, and might be be confused with each other for >> non-native English speakers? (I defer to others' judgment here, as this is >> not something I can experience myself.) >> >> @Kevin Yang <yrql...@gmail.com> @Daniel Standish <dpstand...@gmail.com> any >> final input on this AIP? >> >> >> >> On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <kaxiln...@gmail.com> wrote: >> >> Hi Ash and all, >> >> >> What do people think of this? Worth it? Too complex to reason about what >>> context variables might exist as a result? >> >> >> I think I wouldn't worry about it right now or maybe not as part of this >> AIP. Currently, in one of the Github Issue, a user mentioned that it is not >> straightforward to know what is inside the context dictionary- >> https://github.com/apache/airflow/issues/14396. So maybe we can tackle >> this issue separately once the AbstractTimetable is built. >> >> Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs >>> "DataTimetable") have data_interval_start and end available in the context? >> >> >> hmm.. I would say No but then it contradicts my suggestion to remove >> context dict from this AIP. If we are going to use it in scheduler then >> yes, where data_interval_start = data_interval_end from CronTimetable. >> >> Does anyone have any better names than TimeDeltaTimetable, DataTimetable, >>> and CronTimetable? (We can probably change these names right up until >>> release, so not important to get this correct *now*.) >> >> >> No strong opinion here. Just an alternate suggestion can >> be TimeDeltaSchedule, DataSchedule and CronSchedule >> >> >>> Should I try to roll AIP-30 >>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>> in >>> to this, or should we make that a future addition? (My vote is for future >>> addition) >> >> >> I would vote for Future addition too. >> >> Regards, >> Kaxil >> >> On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <a...@apache.org> wrote: >> >>> I think, yes, AIP-35 or something like it would happily co-exist with >>> this proposal. >>> >>> @Daniel <dpstand...@gmail.com> and I have been discussing this a bit on >>> Slack, and one of the questions he asked was if the concept of >>> data_interval should be moved from DagRun as James and I suggested down on >>> to the individual task: >>> >>> suppose i have a new dag hitting 5 api endpoints and pulling data to s3. >>> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them >>> should pull from yesterday. but the one that failed should pull from 2 days >>> back. so even though these normally have the same interval, today they >>> should not. >>> >>> >>> My view on this is two fold: one, this should primarily be handled by >>> retries on the task, and secondly, having different TaskIstances in the >>> same DagRun have different data intervals would be much harder to reason >>> about/design the UI around, so for those reasons I still think interval >>> should be a DagRun-level concept. >>> >>> (He has a stalled AIP-30 where he proposed something to address this >>> kind of "watermark" case, which we might pick up next after this AIP is >>> complete) >>> >>> One thing we might want to do is extend the interface of >>> AbstractTimetable to be able to add/update parameters in the context dict, >>> so the interface could become this: >>> >>> class AbstractTimetable(ABC): >>> @abstractmethod >>> def next_dagrun_info( >>> date_last_automated_dagrun: Optional[pendulum.DateTime], >>> >>> session: Session, >>> ) -> Optional[DagRunInfo]: >>> """ >>> Get information about the next DagRun of this dag after >>> ``date_last_automated_dagrun`` -- the >>> execution date, and the earliest it could be scheduled >>> >>> :param date_last_automated_dagrun: The max(execution_date) of >>> existing >>> "automated" DagRuns for this dag (scheduled or backfill, >>> but not >>> manual) >>> """ >>> >>> @abstractmethod >>> def set_context_variables(self, dagrun: DagRun, context: Dict[str, >>> Any]) -> None: >>> """ >>> Update or set new context variables to become available in task >>> templates and operators. >>> """ >>> >>> >>> What do people think of this? Worth it? Too complex to reason about what >>> context variables might exist as a result? >>> >>> *Outstanding question*: >>> >>> - Should "interval-less DAGs" (ones using "CronTimetable" in my >>> proposal vs "DataTimetable") have data_interval_start and end available >>> in >>> the context? >>> - Does anyone have any better names than TimeDeltaTimetable, >>> DataTimetable, and CronTimetable? (We can probably change these names >>> right >>> up until release, so not important to get this correct *now*.) >>> - Should I try to roll AIP-30 >>> >>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> >>> in to this, or should we make that a future addition? (My vote is for >>> future addition) >>> >>> >>> I'd like to start voting on this AIP next week (probably on Tuesday) as >>> I think this will be a powerful feature that eases confusing to new users. >>> >>> -Ash >>> >>> >>> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinh...@yandex.com> wrote: >>> >>> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling >>> To Airflow"? >>> I think streaming was also discussed there (though it wasn't really the >>> use case). >>> >>> >>> 02.03.2021, 22:10, "Ash Berlin-Taylor" <a...@apache.org>: >>> >>> Hi Kevin, >>> >>> Interesting idea. My original idea was actually for "interval-less DAGs" >>> (i.e. ones where it's just "run at this time") would not have >>> data_interval_start or end, but (while drafting the AIP) we decided that it >>> was probably "easier" if those values were always datetimes. >>> >>> That said, I think having the DB model have those values be nullable >>> would future proof it without needing another migration to change it. Do >>> you think this is worth doing now? >>> >>> I haven't (yet! It's on my list) spent any significant time thinking >>> about how to make Airflow play nicely with streaming jobs. If anyone else >>> has ideas here please share them >>> >>> -ash >>> >>> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrql...@gmail.com> wrote: >>> >>> Hi Ash and James, >>> >>> This is an exciting move. What do you think about using this opportunity >>> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks >>> that want to run forever like a service. For such use cases, schedule >>> interval might not be meaningful, then do we want to make the date interval >>> param optional to DagRun and task instances? That sounds like a pretty >>> major change to the underlying model of Airflow, but this AIP is so far the >>> best opportunity I saw that can level up Airflow's support for >>> streaming/service use cases. >>> >>> >>> Cheers, >>> Kevin Y >>> >>> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstand...@gmail.com> >>> wrote: >>> >>> Very excited to see this proposal come through and love the direction >>> this has gone. >>> >>> Couple comments... >>> >>> *Tree view / Data completeness view* >>> >>> When you design your tasks with the canonical idempotence pattern, the >>> tree view shows you both data completeness and task execution history >>> (success / failure etc). >>> >>> When you don't use that pattern (which is my general preference), tree >>> view is only task execution history. >>> >>> This change has the potential to unlock a data completeness view for >>> canonical tasks. It's possible that the "data completeness view" can >>> simply be the tree view. I.e. somehow it can use these new classes to know >>> what data was successfully filled and what data wasn't. >>> >>> To the extent we like the idea of either extending / plugging / >>> modifying tree view, or adding a distinct data completeness view, we might >>> want to anticipate the needs of that in this change. And maybe no >>> alteration to the proposal would be needed but just want to throw the idea >>> out there. >>> >>> *Watermark workflow / incremental processing* >>> >>> A common pattern in data warehousing is pulling data incrementally from >>> a source. >>> >>> A standard way to achieve this is at the start of the task, select max >>> `updated_at` in source table and hold on to that value for a minute. This >>> is your tentative new high watermark. >>> Now it's time to pull your data. If your task ran before, grab last >>> high watermark. If not, use initial load value. >>> If successful, update high watermark. >>> >>> On my team we implemented this with a stateful tasks / stateful >>> processes concept (there's a dormant draft AIP here >>> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>) >>> and a WatermarkOperator that handled the boilerplate*. >>> >>> Again here, I don't have a specific suggestion at this moment. But I >>> wanted to articulate this workflow because it is common and it wasn't >>> immediately obvious to me in reading AIP-39 how I would use it to implement >>> it. >>> >>> AIP-39 makes airflow more data-aware. So if it can support this kind of >>> workflow that's great. @Ash Berlin-Taylor <a...@astronomer.io> do you >>> have thoughts on how it might be compatible with this kind of thing as is? >>> >>> --- >>> >>> * The base operator is designed so that Subclasses only need to >>> implement two methods: >>> - `get_high_watermark`: produce the tentative new high watermark >>> ' `watermark_execute`: analogous to implementing poke in a sensor, >>> this is where your work is done. `execute` is left to the base class, and >>> it orchestrates (1) getting last high watermark or inital load value and >>> (2) updating new high watermark if job successful. >>> >>>