Alexis, do you mean you would have done this using an ExternalTaskSensor? Or is there some other way to depend on a range of tasks?
On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <alexis.roll...@ubisoft.com> wrote: > Not sure if it’s optimal compared to what James proposes, but I would have > simply made the weekly and monthly rollup tasks as downstream tasks of the > daily log ingestion tasks they depend on. Then I would have used trigger > rules ‘all_done’ to ensure those rollup tasks start when their parent tasks > are completed. > > https://airflow.incubator.apache.org/concepts.html#trigger-rules > > (daily log ingestion) > (daily rollup) > (daily log ingestion) > (weekly rollup + TriggerRule.all_done) > (daily log ingestion) > (monthly rollup + TriggerRule.all_done) > > Cheers > > Alexis > > On 9 Aug 2018, at 02:57, James Meickle <jmeic...@quantopian.com. > INVALID<mailto:jmeic...@quantopian.com.INVALID>> wrote: > > It sounds like you want something like this? > > root_operator = DummyOperator() > > def offset_operator(i): > my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset}) > }}}};".format(offset=i) > sql_operator = SQLOperator(task_id="offset_by_{}".format(i)", > query=my_sql_query) > return sql_operator > > offset_operators = list(offset_operator(i) for i in range(7)) > root_operator >> offset_operators > > # Daily just waits on today, no offset > do_daily_work = DummyOperator() > offset_operators[0] >> do_daily_work > > # Weekly waits on today AND the six prior offsets > do_weekly_work = DummyOperator() > offset_operators >> do_weekly_work > > IOW, every day you wait for that day's data to be available, and then run > the daily job; you also wait for the previous six days data to be > available, and when it is, run the weekly job. > > n.b. - if you do it this way you will have up to 7 tasks polling the "same" > data point, which is slightly wasteful. But it's also not much code or > mental effort to write it this way. > > On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid< > mailto:gs...@dropbox.com.invalid>> > wrote: > > My main concern is how to express the fact that the weekly rollup depends > on the previous 7 days worth of data, and ensure that it does not run until > the tasks that generate those 7 days of data have run, assuming that tasks > can run non-sequentially. > > It's easy enough when you have the following situation: > > (daily log ingestion) <-- (daily rollup) > > In any given DAG run, you are guaranteed to have the data needed for (daily > rollup), because the dependency that generated its data just ran. > > But I'm not sure how best to model it when you have all of the following: > > (daily log ingestion) <-- (daily rollup) > (daily log ingestion) <-- (weekly rollup) > (daily log ingestion) <-- (monthly rollup) > > > > On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmis...@gmail.com > <mailto:tedmis...@gmail.com>> > wrote: > > Gabriel - > > Ah, I missed your earlier comment about weekly/monthly rollups also being > on a daily cadence. So is your concern e.g., more about reducing the > redundant process of the weekly rollup tasks for the days of that range > that already processed in the previous DAG run(s)? Or mainly about the > dependency of not executing the first weekly at all until the first 7 > daily > rollups worth of data have built up? > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | CV > <https://stackoverflow.com/cv/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeic...@quantopian.com< > mailto:jmeic...@quantopian.com>. > invalid> wrote: > > If you want to run (daily, rolling weekly, rolling monthly) backups on > a > daily basis, and they're mostly the same but have some additional > dependencies, you can write a DAG factory method, which you call three > times. Certain nodes only get added to the longer-than-daily backups. > > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid< > mailto:gs...@dropbox.com.invalid> > > wrote: > > Thanks Andy and Taylor for the suggestions -- > > I see how that would work for the case where you want a weekly rollup > that > runs on a weekly cadence. > > But what about a rolling weekly or monthly rollup that runs each day? > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper < > andy.coo...@astronomer.io<mailto:andy.coo...@astronomer.io>> > wrote: > > To expand on Taylor's idea > > I recently wrote a ScheduleBlackoutSensor that would allow you to > prevent a > task from running if it meets the criteria provided. It accepts an > array > of > args for any number of the criteria so you could leverage this > sensor > to > provide "blackout" runs for a range of days of the week. > > https://github.com/apache/incubator-airflow/pull/3702/files > > For example, > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag) > > Would prevent a task from running Monday - Saturday, allowing it to > run > on > Sunday. > > You could leverage this Sensor as you would any other sensor or you > could > invert the logic so that you would only need to specify > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag) > > To "whitelist" a task to run on Sundays. > > > Let me know if you have any questions > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston < > tedmis...@gmail.com<mailto:tedmis...@gmail.com>> > wrote: > > Gabriel - > > One approach I've seen for a similar use case is to have multiple > related > rollups in one DAG that runs daily, then have the non-daily tasks > skip > most > of the time (e.g., weekly only actually executes on Sundays and > is > parameterized to look at the last 7 days). > > You could implement that not running part a few ways, but one > idea > is a > sensor in front of the weekly rollup task. Imagine a > SundaySensor > like > return > execution_date.weekday() == 6. One thing to keep in mind here is > dependence on the DAG's cron schedule being more granular than > the > tasks. > > I think this could generalize into a DayOfWeekSensor / > DayOfMonthSensor > that would be nice to have. > > Of course this does mean some scheduler inefficiency on the skip > days, > but > as long as those skips are fast and the overall number of tasks > is > small, I > can accept that. > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | CV > <https://stackoverflow.com/cv/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk > <gs...@dropbox.com.invalid<mailto:gs...@dropbox.com.invalid> > > wrote: > > Hello Airflow community, > > I have a basic question about how best to model a common data > pipeline > pattern here at Dropbox. > > At Dropbox, all of our logs are ingested and written into Hive > in > hourly > and/or daily rollups. On top of this data we build many weekly > and > monthly > rollups, which typically run on a daily cadence and compute > results > over > a > rolling window. > > If we have a metric X, it seems natural to put the daily, > weekly, > and > monthly rollups for metric X all in the same DAG. > > However, the different rollups have different dependency > structures. > The > daily job only depends on a single day partition, whereas the > weekly > job > depends on 7, the monthly on 28. > > In Airflow, it seems the two paradigms for modeling > dependencies > are: > 1) Depend on a *single run of a task* within the same DAG > 2) Depend on *multiple runs of task* by using an > ExternalTaskSensor > > I'm not sure how I could possibly model this scenario using > approach > #1, > and I'm not sure approach #2 is the most elegant or performant > way > to > model > this scenario. > > Any thoughts or suggestions? > > > > > > > >