The reasoning behind having a task schedule is explained by removing the need to spin up a task to make the decision about skipping.
@Malthe I'm curious to know how many such "redundant" decision making tasks/junctions you have in your DAGs? I wonder if a DAG has too many of such junctions, maybe the tasks are not really part of that DAG and should simply be moved to another DAG with a different scheduling? I too have use cases of skipping tasks from workflows but counting how many such "redundant" junctions I have is nothing compared to the number of tasks in my system so it's very tolerable. On Mon, Nov 8, 2021 at 4:37 AM Daniel Standish <[email protected]> wrote: > typo: >> >> I'm a bit skeptical that there would be a good interface for like a >> task-level-timetable-override if that's where you're going -- like a way >> for each schedule *task *to have its own timetable override that's >> processed within the dag run. >> > > On Sun, Nov 7, 2021 at 6:34 PM Daniel Standish <[email protected]> > wrote: > >> A bit confused about what you're proposing Malthe. >> >> The thread subject is "task-level schedulling" but it says "there is no >> such interface to control >> task-level scheduling – or more specifically, the ability to control >> which DAG runs to skip." >> >> This makes it sound like you're talking about being able to have logic to >> skip a dag. >> >> If we're really talking about skipping dag runs, I suppose it depends >> what the skip logic is. If it's date based you could use the timetable >> interface to do so, otherwise yeah it would have to be done from a task. I >> don't think of a maybe-skip task as causing that much overhead. >> >> But maybe you're talking about having skip logic within each task that >> isn't managed by a branch operator or similar. >> >> It's fairly easy to achieve this already with a subclass. >> >> class SkippyBashOperator(BashOperator): >> def execute(context): >> if certain_conditions: >> raise AirflowSkipException("I skipped") >> super().execute(context) >> >> re >> >> It should be evident from the task execution details why the task was >>> skipped – the interface should provide the necessary string >>> representation functionality. >>> >> >> i think that being able to put a `reason` in AirlfowSkipException and >> have that be somehow visible in UI could be good. Also, being able to >> specify `color` could also be nice. >> >> re >> >>> Main problem of such an approach (currently) is that there is no >>> visual indication that a specific task was actually "skipped" rather >>> than "executed" >>> >> >> The current UI _does_ already indicate when a task is skipped -- it's a >> light pink color. But maybe I misunderstand something. >> >> I'm a bit skeptical that there would be a good interface for like a >> task-level-timetable-override if that's where you're going -- like a way >> for each schedule to have its own timetable override that's processed >> within the dag run. >> >> Or are you proposing that we allow tasks to exist and be schedulable >> without being associated with a dag? >> >> >> On Sun, Nov 7, 2021 at 2:07 PM Jarek Potiuk <[email protected]> wrote: >> >>> I have mixed feelings about that one. >>> >>> I understand that sometimes you would like to skip processing of a >>> single task using schedule "criteria", but IMHO that creates some >>> ambiguities and the necessity of more complex logic for the remainder >>> of DAGs. One thing is that you cannot really specify "output" for such >>> a task. If it is skipped then it does not produce any output, so the >>> logic of skipping previous tasks should be included in the following >>> tasks. >>> >>> You really need to say "if another task was skipped, don't use it's >>> output". Or similar. Another variant of this is "if the output does >>> not exist", but that's a bit of implicit behaviour and does not play >>> well with - some edge cases - for example - adding such conditional >>> skip and back-filling history. If you base it on the existence of >>> output backfilling will not work. >>> >>> So if you have tasks that pass any outputs, this approach might be >>> quite problematic IMHO. >>> >>> Somehow I have a feeling that it's much easier to this kind of >>> skipping using either the branch operator mentioned by Ash (then the >>> output might be either "prepared' or "empty" depending on the branch) >>> or even have custom PythonOperator/@task callable where you produce >>> either "prepared" or "empty" output based on some time/cron logic. >>> That somehow feels much more consistent and much more flexible, as you >>> can base your decision on how you are running the task based on more >>> criteria.. >>> >>> Main problem of such an approach (currently) is that there is no >>> visual indication that a specific task was actually "skipped" rather >>> than "executed". This is true, but as I see it (and this is a much >>> more generic approach) as an opportunity here to add such an indicator >>> of task execution "flavour". >>> >>> When a task was executed it could be executed this or that way and >>> mark it's status as such. This could be an icon, border, text color (I >>> guess Brent can come with some ideas here) - some way to indicate that >>> the task in this "dagrun" was run differently than in the "yesterdays" >>> one. And it would not be limited to a "time schedule" difference only. >>> It could be based on much more complex and different criteria. If we >>> actually "execute" such a task rather than just "skip/run" we have the >>> powers of custom Python code working for us. >>> >>> I can imagine very different "flavours" of execution: >>> >>> * based on time of day/week etc. (your case) >>> * based on amount of data to process >>> * based on number of errors/warnings encountered during processing >>> * based on type of data seen >>> >>> Also the problem with "task-based schedule" is that due to the >>> scheduler that cannot run any custom DAG-writer provided code, the >>> flexibility of timeline logic is limited to whatever has been >>> installed as a plugin by the admin. If we assume that "task execution" >>> actually happens for such "conditional execution tasks", then we can >>> run a code which has been written by the DAG writer - which adds to >>> the flexibility of task execution logic and this flexibility is >>> infinite. >>> >>> I feel that adding a timeline-only "flavour" of a run is very limiting >>> and we can do better. >>> >>> But I am happy to discuss it stil. >>> >>> J. >>> >>> On Thu, Nov 4, 2021 at 12:04 PM Ash Berlin-Taylor <[email protected]> >>> wrote: >>> > >>> > For context, the reason Malthe is proposing something like this, and >>> doesn't want to use the "existing" approach of a BranchOperator or similar >>> is optimization: Having to spin up a task to make a decision is, in many >>> cases, not necessary and the scheduler could make this decision quickly. >>> > >>> > (This is along similar lines to why we no longer schedule or actually >>> run DummyOperator but just mark it as success directly in the scheduler.) >>> > >>> > AIP-39 is a little unclear on how the new "logical_date" value changes >>> with the different timetable implementations or if it's simply used >>> internally for sorting purposes and not meaningful on its own. For this >>> proposal to work, there has to be a well-defined "execution date" that we >>> can compare against. >>> > >>> > >>> > >>> > data_interval_start and/or data_interval_end are the dates you should >>> use for such a purpose >>> > >>> > Please don't use the term execution date -- it is too overloaded and >>> confusing. >>> > >>> > -ash >>> > >>> > >>> > On Mon, Oct 18 2021 at 21:17:22 +0000, Malthe <[email protected]> >>> wrote: >>> > >>> > While AIP-39 provides an interface for more powerful pluggable >>> scheduling behaviours, there is no such interface to control task-level >>> scheduling – or more specifically, the ability to control which DAG runs to >>> skip. Examples: - Skip task execution on certain days - Skip task execution >>> on certain hours which could vary from day to day Whether or not child >>> tasks would be affected by such task scheduling depends on the trigger rule >>> configured on those tasks (e.g. "all_success", "all_done"). The interface >>> might consist of both an include and exclude expression – by default all >>> executions would be included and none excluded. In both cases, the >>> scheduling could be a cron expression but the interface should again >>> support more powerful behaviors. It should be evident from the task >>> execution details why the task was skipped – the interface should provide >>> the necessary string representation functionality. AIP-39 is a little >>> unclear on how the new "logical_date" value changes with the different >>> timetable implementations or if it's simply used internally for sorting >>> purposes and not meaningful on its own. For this proposal to work, there >>> has to be a well-defined "execution date" that we can compare against. >>> >>
