I think, yes, AIP-35 or something like it would happily co-exist with
this proposal.
@Daniel <mailto:dpstand...@gmail.com> and I have been discussing this a
bit on Slack, and one of the questions he asked was if the concept of
data_interval should be moved from DagRun as James and I suggested down
on to the individual task:
suppose i have a new dag hitting 5 api endpoints and pulling data to
s3. suppose that yesterday 4 of them succeeded but one failed. today,
4 of them should pull from yesterday. but the one that failed should
pull from 2 days back. so even though these normally have the same
interval, today they should not.
My view on this is two fold: one, this should primarily be handled by
retries on the task, and secondly, having different TaskIstances in the
same DagRun have different data intervals would be much harder to
reason about/design the UI around, so for those reasons I still think
interval should be a DagRun-level concept.
(He has a stalled AIP-30 where he proposed something to address this
kind of "watermark" case, which we might pick up next after this AIP is
complete)
One thing we might want to do is extend the interface of
AbstractTimetable to be able to add/update parameters in the context
dict, so the interface could become this:
class AbstractTimetable(ABC):
@abstractmethod
def next_dagrun_info(
date_last_automated_dagrun: Optional[pendulum.DateTime],
session: Session,
)-> Optional[DagRunInfo]:
"""
Get information about the next DagRun of this dag after
``date_last_automated_dagrun`` -- the
execution date, and the earliest it could be scheduled
:param date_last_automated_dagrun: The max(execution_date) of
existing
"automated" DagRuns for this dag (scheduled or backfill,
but not
manual)
"""
@abstractmethod
def set_context_variables(self, dagrun: DagRun,
context:Dict[str,Any])->None:
"""
Update or set new context variables to become available in task
templates and operators.
"""
What do people think of this? Worth it? Too complex to reason about
what context variables might exist as a result?
*Outstanding question*:
Should "interval-less DAGs" (ones using "CronTimetable" in my proposal
vs "DataTimetable") have data_interval_start and end available in the
context?Does anyone have any better names than TimeDeltaTimetable,
DataTimetable, and CronTimetable? (We can probably change these names
right up until release, so not important to get this correct
/now/.)Should I try to roll AIP-30
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
in to this, or should we make that a future addition? (My vote is for
future addition)
I'd like to start voting on this AIP next week (probably on Tuesday) as
I think this will be a powerful feature that eases confusing to new
users.
-Ash
On Tue, 2 Mar, 2021 at 23:05, Alex Inhert <alexinh...@yandex.com> wrote:
Is this AIP going to co-exist with AIP-35 "Add Signal Based
Scheduling To Airflow"?
I think streaming was also discussed there (though it wasn't really
the use case).
02.03.2021, 22:10, "Ash Berlin-Taylor" <a...@apache.org>:
Hi Kevin,
Interesting idea. My original idea was actually for "interval-less
DAGs" (i.e. ones where it's just "run at this time") would not have
data_interval_start or end, but (while drafting the AIP) we decided
that it was probably "easier" if those values were always datetimes.
That said, I think having the DB model have those values be nullable
would future proof it without needing another migration to change
it. Do you think this is worth doing now?
I haven't (yet! It's on my list) spent any significant time thinking
about how to make Airflow play nicely with streaming jobs. If anyone
else has ideas here please share them
-ash
On Sat, 27 Feb, 2021 at 16:09, Kevin Yang <yrql...@gmail.com
<mailto:yrql...@gmail.com>> wrote:
Hi Ash and James,
This is an exciting move. What do you think about using this
opportunity to extend Airflow's support to streaming like use
cases? I.e DAGs/tasks that want to run forever like a service. For
such use cases, schedule interval might not be meaningful, then do
we want to make the date interval param optional to DagRun and task
instances? That sounds like a pretty major change to the underlying
model of Airflow, but this AIP is so far the best opportunity I saw
that can level up Airflow's support for streaming/service use cases.
Cheers,
Kevin Y
On Fri, Feb 26, 2021 at 8:56 AM Daniel Standish
<dpstand...@gmail.com <mailto:dpstand...@gmail.com>> wrote:
Very excited to see this proposal come through and love the
direction this has gone.
Couple comments...
*Tree view / Data completeness view*
When you design your tasks with the canonical idempotence pattern,
the tree view shows you both data completeness and task execution
history (success / failure etc).
When you don't use that pattern (which is my general preference),
tree view is only task execution history.
This change has the potential to unlock a data completeness view
for canonical tasks. It's possible that the "data completeness
view" can simply be the tree view. I.e. somehow it can use these
new classes to know what data was successfully filled and what
data wasn't.
To the extent we like the idea of either extending / plugging /
modifying tree view, or adding a distinct data completeness view,
we might want to anticipate the needs of that in this change. And
maybe no alteration to the proposal would be needed but just want
to throw the idea out there.
*Watermark workflow / incremental processing*
A common pattern in data warehousing is pulling data incrementally
from a source.
A standard way to achieve this is at the start of the task, select
max `updated_at` in source table and hold on to that value for a
minute. This is your tentative new high watermark.
Now it's time to pull your data. If your task ran before, grab
last high watermark. If not, use initial load value.
If successful, update high watermark.
On my team we implemented this with a stateful tasks / stateful
processes concept (there's a dormant draft AIP here
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>)
and a WatermarkOperator that handled the boilerplate*.
Again here, I don't have a specific suggestion at this moment.
But I wanted to articulate this workflow because it is common and
it wasn't immediately obvious to me in reading AIP-39 how I would
use it to implement it.
AIP-39 makes airflow more data-aware. So if it can support this
kind of workflow that's great. @Ash Berlin-Taylor
<mailto:a...@astronomer.io> do you have thoughts on how it might be
compatible with this kind of thing as is?
---
* The base operator is designed so that Subclasses only need to
implement two methods:
- `get_high_watermark`: produce the tentative new high
watermark
' `watermark_execute`: analogous to implementing poke in a
sensor, this is where your work is done. `execute` is left to the
base class, and it orchestrates (1) getting last high watermark or
inital load value and (2) updating new high watermark if job
successful.