Hi Ash and all,

What do people think of this? Worth it? Too complex to reason about what
> context variables might exist as a result?


I think I wouldn't worry about it right now or maybe not as part of this
AIP. Currently, in one of the Github Issue, a user mentioned that it is not
straightforward to know what is inside the context dictionary-
https://github.com/apache/airflow/issues/14396. So maybe we can tackle this
issue separately once the AbstractTimetable is built.

Should "interval-less DAGs" (ones using "CronTimetable" in my proposal vs
> "DataTimetable") have data_interval_start and end available in the context?


hmm.. I would say No but then it contradicts my suggestion to remove
context dict from this AIP. If we are going to use it in scheduler then
yes, where data_interval_start = data_interval_end from CronTimetable.

Does anyone have any better names than TimeDeltaTimetable, DataTimetable,
> and CronTimetable? (We can probably change these names right up until
> release, so not important to get this correct *now*.)


No strong opinion here. Just an alternate suggestion can
be TimeDeltaSchedule, DataSchedule and CronSchedule


> Should I try to roll AIP-30
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>  in
> to this, or should we make that a future addition? (My vote is for future
> addition)


I would vote for Future addition too.

Regards,
Kaxil

On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <a...@apache.org> wrote:

> I think, yes, AIP-35 or something like it would happily co-exist with this
> proposal.
>
> @Daniel <dpstand...@gmail.com> and I have been discussing this a bit on
> Slack, and one of the questions he asked was if the concept of
> data_interval should be moved from DagRun as James and I suggested down on
> to the individual task:
>
> suppose i have a new dag hitting 5 api endpoints and pulling data to s3.
> suppose that yesterday 4 of them succeeded but one failed. today, 4 of them
> should pull from yesterday. but the one that failed should pull from 2 days
> back. so even though these normally have the same interval, today they
> should not.
>
>
> My view on this is two fold: one, this should primarily be handled by
> retries on the task, and secondly, having different TaskIstances in the
> same DagRun  have different data intervals would be much harder to reason
> about/design the UI around, so for those reasons I still think interval
> should be a DagRun-level concept.
>
> (He has a stalled AIP-30 where he proposed something to address this kind
> of "watermark" case, which we might pick up next after this AIP is complete)
>
> One thing we might want to do is extend the interface of AbstractTimetable
> to be able to add/update parameters in the context dict, so the interface
> could become this:
>
> class AbstractTimetable(ABC):
>     @abstractmethod
>     def next_dagrun_info(
>         date_last_automated_dagrun: Optional[pendulum.DateTime],
>
>         session: Session,
>     ) -> Optional[DagRunInfo]:
>         """
>         Get information about the next DagRun of this dag after
> ``date_last_automated_dagrun`` -- the
>         execution date, and the earliest it could be scheduled
>
>         :param date_last_automated_dagrun: The max(execution_date) of
> existing
>             "automated" DagRuns for this dag (scheduled or backfill, but
> not
>             manual)
>         """
>
>     @abstractmethod
>     def set_context_variables(self, dagrun: DagRun, context: Dict[str, Any])
> -> None:
>         """
>         Update or set new context variables to become available in task
> templates and operators.
>         """
>
>
> What do people think of this? Worth it? Too complex to reason about what
> context variables might exist as a result?
>
> *Outstanding question*:
>
>    - Should "interval-less DAGs" (ones using "CronTimetable" in my
>    proposal vs "DataTimetable") have data_interval_start and end available in
>    the context?
>    - Does anyone have any better names than TimeDeltaTimetable,
>    DataTimetable, and CronTimetable? (We can probably change these names right
>    up until release, so not important to get this correct *now*.)
>    - Should I try to roll AIP-30
>    
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
>    in to this, or should we make that a future addition? (My vote is for
>    future addition)
>
>
> I'd like to start voting on this AIP next week (probably on Tuesday) as I
> think this will be a powerful feature that eases confusing to new users.
>
> -Ash
>
>
> On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinh...@yandex.com> wrote:
>
> Is this AIP going to co-exist with AIP-35 "Add Signal Based Scheduling To
> Airflow"?
> I think streaming was also discussed there (though it wasn't really the
> use case).
>
>
> 02.03.2021, 22:10, "Ash Berlin-Taylor" <a...@apache.org>:
>
> Hi Kevin,
>
> Interesting idea. My original idea was actually for "interval-less DAGs"
> (i.e. ones where it's just "run at this time") would not have
> data_interval_start or end, but (while drafting the AIP) we decided that it
> was probably "easier" if those values were always datetimes.
>
> That said, I think having the DB model have those values be nullable would
> future proof it without needing another migration to change it. Do you
> think this is worth doing now?
>
> I haven't (yet! It's on my list) spent any significant time thinking about
> how to make Airflow play nicely with streaming jobs. If anyone else has
> ideas here please share them
>
> -ash
>
> On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrql...@gmail.com> wrote:
>
> Hi Ash and James,
>
> This is an exciting move. What do you think about using this opportunity
> to extend Airflow's support to streaming like use cases? I.e DAGs/tasks
> that want to run forever like a service. For such use cases, schedule
> interval might not be meaningful, then do we want to make the date interval
> param optional to DagRun and task instances? That sounds like a pretty
> major change to the underlying model of Airflow, but this AIP is so far the
> best opportunity I saw that can level up Airflow's support for
> streaming/service use cases.
>
>
> Cheers,
> Kevin Y
>
> On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish <dpstand...@gmail.com>
> wrote:
>
> Very excited to see this proposal come through and love the direction this
> has gone.
>
> Couple comments...
>
> *Tree view / Data completeness view*
>
> When you design your tasks with the canonical idempotence pattern, the
> tree view shows you both data completeness and task execution history
> (success / failure etc).
>
> When you don't use that pattern (which is my general preference), tree
> view is only task execution history.
>
> This change has the potential to unlock a data completeness view for
> canonical tasks.  It's possible that the "data completeness view" can
> simply be the tree view.  I.e. somehow it can use these new classes to know
> what data was successfully filled and what data wasn't.
>
> To the extent we like the idea of either extending / plugging / modifying
> tree view, or adding a distinct data completeness view, we might want to
> anticipate the needs of that in this change.  And maybe no alteration to
> the proposal would be needed but just want to throw the idea out there.
>
> *Watermark workflow / incremental processing*
>
> A common pattern in data warehousing is pulling data incrementally from a
> source.
>
> A standard way to achieve this is at the start of the task, select max
> `updated_at` in source table and hold on to that value for a minute.  This
> is your tentative new high watermark.
> Now it's time to pull your data.  If your task ran before, grab last high
> watermark.  If not, use initial load value.
> If successful, update high watermark.
>
> On my team we implemented this with a stateful tasks / stateful processes
> concept (there's a dormant draft AIP here
> <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
> and a WatermarkOperator that handled the boilerplate*.
>
> Again here, I don't have a specific suggestion at this moment.  But I
> wanted to articulate this workflow because it is common and it wasn't
> immediately obvious to me in reading AIP-39 how I would use it to implement
> it.
>
> AIP-39 makes airflow more data-aware.  So if it can support this kind of
> workflow that's great.  @Ash Berlin-Taylor <a...@astronomer.io> do you
> have thoughts on how it might be compatible with this kind of thing as is?
>
> ---
>
> * The base operator is designed so that Subclasses only need to implement
> two methods:
>     - `get_high_watermark`: produce the tentative new high watermark
>     ' `watermark_execute`: analogous to implementing poke in a sensor,
> this is where your work is done. `execute` is left to the base class, and
> it orchestrates (1) getting last high watermark or inital load value and
> (2) updating new high watermark if job successful.
>
>

Reply via email to