The reasoning behind having a task schedule is explained by removing the
need to spin up a task to make the decision about skipping.

@Malthe I'm curious to know how many such "redundant" decision making
tasks/junctions you have in your DAGs?
I wonder if a DAG has too many of such junctions, maybe the tasks are not
really part of that DAG and should simply be moved to another DAG with a
different scheduling?
I too have use cases of skipping tasks from workflows but counting how many
such "redundant" junctions I have is nothing compared to the number of
tasks in my system so it's very tolerable.

On Mon, Nov 8, 2021 at 4:37 AM Daniel Standish <[email protected]> wrote:

> typo:
>>
>> I'm a bit skeptical that there would be a good interface for like a
>> task-level-timetable-override if that's where you're going -- like a way
>> for each schedule *task *to have its own timetable override that's
>> processed within the dag run.
>>
>
> On Sun, Nov 7, 2021 at 6:34 PM Daniel Standish <[email protected]>
> wrote:
>
>> A bit confused about what you're proposing Malthe.
>>
>> The thread subject is "task-level schedulling" but it says "there is no
>> such interface to control
>> task-level scheduling – or more specifically, the ability to control
>> which DAG runs to skip."
>>
>> This makes it sound like you're talking about being able to have logic to
>> skip a dag.
>>
>> If we're really talking about skipping dag runs, I suppose it depends
>> what the skip logic is.   If it's date based you could use the timetable
>> interface to do so, otherwise yeah it would have to be done from a task.  I
>> don't think of a maybe-skip task as causing that much overhead.
>>
>> But maybe you're talking about having skip logic within each task that
>> isn't managed by a branch operator or similar.
>>
>> It's fairly easy to achieve this already with a subclass.
>>
>> class SkippyBashOperator(BashOperator):
>>     def execute(context):
>>         if certain_conditions:
>>             raise AirflowSkipException("I skipped")
>>         super().execute(context)
>>
>> re
>>
>> It should be evident from the task execution details why the task was
>>> skipped – the interface should provide the necessary string
>>> representation functionality.
>>>
>>
>> i think that being able to put a `reason` in AirlfowSkipException and
>> have that be somehow visible in UI could be good. Also, being able to
>> specify `color` could also be nice.
>>
>> re
>>
>>> Main problem of such an approach (currently) is that there is no
>>> visual indication that a specific task was actually "skipped" rather
>>> than "executed"
>>>
>>
>> The current UI _does_ already indicate when a task is skipped -- it's a
>> light pink color.  But maybe I misunderstand something.
>>
>> I'm a bit skeptical that there would be a good interface for like a
>> task-level-timetable-override if that's where you're going -- like a way
>> for each schedule to have its own timetable override that's processed
>> within the dag run.
>>
>> Or are you proposing that we allow tasks to exist and be schedulable
>> without being associated with a dag?
>>
>>
>> On Sun, Nov 7, 2021 at 2:07 PM Jarek Potiuk <[email protected]> wrote:
>>
>>> I have mixed feelings about that one.
>>>
>>> I understand that sometimes you would like to skip processing of a
>>> single task using schedule "criteria", but IMHO that creates some
>>> ambiguities and the necessity of more complex logic for the remainder
>>> of DAGs. One thing is that you cannot really specify "output" for such
>>> a task. If it is skipped then it does not produce any output, so the
>>> logic of skipping previous tasks should be included in the following
>>> tasks.
>>>
>>> You really need to say "if another task was skipped, don't use it's
>>> output". Or similar. Another variant of this is "if the output does
>>> not exist", but that's a bit of implicit behaviour and does not play
>>> well with - some edge cases - for example - adding such conditional
>>> skip and back-filling history. If you base it on the existence of
>>> output backfilling will not work.
>>>
>>> So if you have tasks that pass any outputs, this approach might be
>>> quite problematic IMHO.
>>>
>>> Somehow I have a feeling that it's much easier to this kind of
>>> skipping using either the branch operator mentioned by Ash (then the
>>> output might be either "prepared' or "empty" depending on the branch)
>>> or even have custom PythonOperator/@task callable where you produce
>>> either "prepared" or "empty" output based on some time/cron logic.
>>> That somehow feels much more consistent and much more flexible, as you
>>> can base your decision on how you are running the task based on more
>>> criteria..
>>>
>>> Main problem of such an approach (currently) is that there is no
>>> visual indication that a specific task was actually "skipped" rather
>>> than "executed". This is true, but as I see it (and this is a much
>>> more generic approach) as an opportunity here to add such an indicator
>>> of task execution "flavour".
>>>
>>> When a task was executed it could be executed this or that way and
>>> mark it's status as such. This could be an icon, border, text color (I
>>> guess Brent can come with some ideas here) - some way to indicate that
>>> the task in this "dagrun" was run differently than in the "yesterdays"
>>> one. And it would not be limited to a "time schedule" difference only.
>>> It could be based on much more complex and different criteria. If we
>>> actually "execute" such a task rather than just "skip/run" we have the
>>> powers of custom Python code working for us.
>>>
>>> I can imagine very different "flavours" of execution:
>>>
>>> * based on time of day/week etc. (your case)
>>> * based on amount of data to process
>>> * based on number of errors/warnings encountered during processing
>>> * based on type of data seen
>>>
>>> Also the problem with "task-based schedule" is that due to the
>>> scheduler that cannot run any custom DAG-writer provided code, the
>>> flexibility of timeline logic is limited to whatever has been
>>> installed as a plugin by the admin. If we assume that "task execution"
>>> actually happens for such "conditional execution tasks", then we can
>>> run a code which has been written by the DAG writer - which adds to
>>> the flexibility of task execution logic and this flexibility is
>>> infinite.
>>>
>>> I feel that adding a timeline-only "flavour" of a run is very limiting
>>> and we can do better.
>>>
>>> But I am happy to discuss it stil.
>>>
>>> J.
>>>
>>> On Thu, Nov 4, 2021 at 12:04 PM Ash Berlin-Taylor <[email protected]>
>>> wrote:
>>> >
>>> > For context, the reason Malthe is proposing something like this, and
>>> doesn't want to use the "existing" approach of a BranchOperator or similar
>>> is optimization: Having to spin up a task to make a decision is, in many
>>> cases, not necessary and the scheduler could make this decision quickly.
>>> >
>>> > (This is along similar lines to why we no longer schedule or actually
>>> run DummyOperator but just mark it as success directly in the scheduler.)
>>> >
>>> > AIP-39 is a little unclear on how the new "logical_date" value changes
>>> with the different timetable implementations or if it's simply used
>>> internally for sorting purposes and not meaningful on its own. For this
>>> proposal to work, there has to be a well-defined "execution date" that we
>>> can compare against.
>>> >
>>> >
>>> >
>>> > data_interval_start and/or data_interval_end are the dates you should
>>> use for such a purpose
>>> >
>>> > Please don't use the term execution date -- it is too overloaded and
>>> confusing.
>>> >
>>> > -ash
>>> >
>>> >
>>> > On Mon, Oct 18 2021 at 21:17:22 +0000, Malthe <[email protected]>
>>> wrote:
>>> >
>>> > While AIP-39 provides an interface for more powerful pluggable
>>> scheduling behaviours, there is no such interface to control task-level
>>> scheduling – or more specifically, the ability to control which DAG runs to
>>> skip. Examples: - Skip task execution on certain days - Skip task execution
>>> on certain hours which could vary from day to day Whether or not child
>>> tasks would be affected by such task scheduling depends on the trigger rule
>>> configured on those tasks (e.g. "all_success", "all_done"). The interface
>>> might consist of both an include and exclude expression – by default all
>>> executions would be included and none excluded. In both cases, the
>>> scheduling could be a cron expression but the interface should again
>>> support more powerful behaviors. It should be evident from the task
>>> execution details why the task was skipped – the interface should provide
>>> the necessary string representation functionality. AIP-39 is a little
>>> unclear on how the new "logical_date" value changes with the different
>>> timetable implementations or if it's simply used internally for sorting
>>> purposes and not meaningful on its own. For this proposal to work, there
>>> has to be a well-defined "execution date" that we can compare against.
>>>
>>

Reply via email to