Summary of changes so far on the AIP:
My proposed rename of DagRun.execution_date is now DagRun.schedule_date
(previously I had proposed run_date. Thanks dstandish!)
Timetable classes are renamed to Schedule classes (CronSchedule etc),
similarly the DAG argument is now schedule (reminder: schedule_interval
will not be removed or deprecated, and will still be the way to use
"simple" expressions)
-ash
On Wed, 10 Mar, 2021 at 14:15, Ash Berlin-Taylor <a...@apache.org> wrote:
Could change Timetable To Schedule -- that would mean the DAG arg
becomes `schedule=CronSchedule(...)` -- a bit close to the current
`schedule_interval` but I think clear enough difference.
I do like the name but my one worry with "schedule" is that Scheduler
and Schedule are very similar, and might be be confused with each
other for non-native English speakers? (I defer to others' judgment
here, as this is not something I can experience myself.)
@Kevin Yang <mailto:yrql...@gmail.com> @Daniel Standish
<mailto:dpstand...@gmail.com> any final input on this AIP?
On Tue, 9 Mar, 2021 at 16:59, Kaxil Naik <kaxiln...@gmail.com> wrote:
Hi Ash and all,
What do people think of this? Worth it? Too complex to reason about
what context variables might exist as a result?
I think I wouldn't worry about it right now or maybe not as part of
this AIP. Currently, in one of the Github Issue, a user mentioned
that it is not straightforward to know what is inside the context
dictionary- <https://github.com/apache/airflow/issues/14396>. So
maybe we can tackle this issue separately once the AbstractTimetable
is built.
Should "interval-less DAGs" (ones using "CronTimetable" in my
proposal vs "DataTimetable") have data_interval_start and end
available in the context?
hmm.. I would say No but then it contradicts my suggestion to remove
context dict from this AIP. If we are going to use it in scheduler
then yes, where data_interval_start = data_interval_end from
CronTimetable.
Does anyone have any better names than TimeDeltaTimetable,
DataTimetable, and CronTimetable? (We can probably change these
names right up until release, so not important to get this correct
/now/.)
No strong opinion here. Just an alternate suggestion can be
TimeDeltaSchedule, DataSchedule and CronSchedule
Should I try to roll AIP-30
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
in to this, or should we make that a future addition? (My vote is
for future addition)
I would vote for Future addition too.
Regards,
Kaxil
On Sat, Mar 6, 2021 at 11:05 AM Ash Berlin-Taylor <a...@apache.org
<mailto:a...@apache.org>> wrote:
I think, yes, AIP-35 or something like it would happily co-exist
with this proposal.
@Daniel <mailto:dpstand...@gmail.com> and I have been discussing
this a bit on Slack, and one of the questions he asked was if the
concept of data_interval should be moved from DagRun as James and I
suggested down on to the individual task:
suppose i have a new dag hitting 5 api endpoints and pulling data
to s3. suppose that yesterday 4 of them succeeded but one failed.
today, 4 of them should pull from yesterday. but the one that
failed should pull from 2 days back. so even though these normally
have the same interval, today they should not.
My view on this is two fold: one, this should primarily be handled
by retries on the task, and secondly, having different TaskIstances
in the same DagRun have different data intervals would be much
harder to reason about/design the UI around, so for those reasons I
still think interval should be a DagRun-level concept.
(He has a stalled AIP-30 where he proposed something to address
this kind of "watermark" case, which we might pick up next after
this AIP is complete)
One thing we might want to do is extend the interface of
AbstractTimetable to be able to add/update parameters in the
context dict, so the interface could become this:
class AbstractTimetable(ABC):
@abstractmethod
def next_dagrun_info(
date_last_automated_dagrun: Optional[pendulum.DateTime],
session: Session,
)-> Optional[DagRunInfo]:
"""
Get information about the next DagRun of this dag after
``date_last_automated_dagrun`` -- the
execution date, and the earliest it could be scheduled
:param date_last_automated_dagrun: The max(execution_date)
of existing
"automated" DagRuns for this dag (scheduled or
backfill, but not
manual)
"""
@abstractmethod
def set_context_variables(self, dagrun: DagRun,
context:Dict[str,Any])->None:
"""
Update or set new context variables to become available in
task templates and operators.
"""
What do people think of this? Worth it? Too complex to reason about
what context variables might exist as a result?
*Outstanding question*:
Should "interval-less DAGs" (ones using "CronTimetable" in my
proposal vs "DataTimetable") have data_interval_start and end
available in the context?Does anyone have any better names than
TimeDeltaTimetable, DataTimetable, and CronTimetable? (We can
probably change these names right up until release, so not
important to get this correct /now/.)Should I try to roll AIP-30
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
in to this, or should we make that a future addition? (My vote is
for future addition)
I'd like to start voting on this AIP next week (probably on
Tuesday) as I think this will be a powerful feature that eases
confusing to new users.
-Ash
On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinh...@yandex.com
<mailto:alexinh...@yandex.com>> wrote:
Is this AIP going to co-exist with AIP-35 "Add Signal Based
Scheduling To Airflow"?
I think streaming was also discussed there (though it wasn't
really the use case).
02.03.2021, 22:10, "Ash Berlin-Taylor" <a...@apache.org
<mailto:a...@apache.org>>:
Hi Kevin,
Interesting idea. My original idea was actually for
"interval-less DAGs" (i.e. ones where it's just "run at this
time") would not have data_interval_start or end, but (while
drafting the AIP) we decided that it was probably "easier" if
those values were always datetimes.
That said, I think having the DB model have those values be
nullable would future proof it without needing another migration
to change it. Do you think this is worth doing now?
I haven't (yet! It's on my list) spent any significant time
thinking about how to make Airflow play nicely with streaming
jobs. If anyone else has ideas here please share them
-ash
On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrql...@gmail.com
<mailto:yrql...@gmail.com>> wrote:
Hi Ash and James,
This is an exciting move. What do you think about using this
opportunity to extend Airflow's support to streaming like use
cases? I.e DAGs/tasks that want to run forever like a service.
For such use cases, schedule interval might not be meaningful,
then do we want to make the date interval param optional to
DagRun and task instances? That sounds like a pretty major
change to the underlying model of Airflow, but this AIP is so
far the best opportunity I saw that can level up Airflow's
support for streaming/service use cases.
Cheers,
Kevin Y
On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish
<dpstand...@gmail.com <mailto:dpstand...@gmail.com>> wrote:
Very excited to see this proposal come through and love the
direction this has gone.
Couple comments...
*Tree view / Data completeness view*
When you design your tasks with the canonical idempotence
pattern, the tree view shows you both data completeness and
task execution history (success / failure etc).
When you don't use that pattern (which is my general
preference), tree view is only task execution history.
This change has the potential to unlock a data completeness
view for canonical tasks. It's possible that the "data
completeness view" can simply be the tree view. I.e. somehow
it can use these new classes to know what data was successfully
filled and what data wasn't.
To the extent we like the idea of either extending / plugging /
modifying tree view, or adding a distinct data completeness
view, we might want to anticipate the needs of that in this
change. And maybe no alteration to the proposal would be
needed but just want to throw the idea out there.
*Watermark workflow / incremental processing*
A common pattern in data warehousing is pulling data
incrementally from a source.
A standard way to achieve this is at the start of the task,
select max `updated_at` in source table and hold on to that
value for a minute. This is your tentative new high watermark.
Now it's time to pull your data. If your task ran before, grab
last high watermark. If not, use initial load value.
If successful, update high watermark.
On my team we implemented this with a stateful tasks / stateful
processes concept (there's a dormant draft AIP here
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
and a WatermarkOperator that handled the boilerplate*.
Again here, I don't have a specific suggestion at this moment.
But I wanted to articulate this workflow because it is common
and it wasn't immediately obvious to me in reading AIP-39 how I
would use it to implement it.
AIP-39 makes airflow more data-aware. So if it can support
this kind of workflow that's great. @Ash Berlin-Taylor
<mailto:a...@astronomer.io> do you have thoughts on how it might
be compatible with this kind of thing as is?
---
* The base operator is designed so that Subclasses only need to
implement two methods:
- `get_high_watermark`: produce the tentative new high
watermark
' `watermark_execute`: analogous to implementing poke in a
sensor, this is where your work is done. `execute` is left to
the base class, and it orchestrates (1) getting last high
watermark or inital load value and (2) updating new high
watermark if job successful.