A bit confused about what you're proposing Malthe.

The thread subject is "task-level schedulling" but it says "there is no
such interface to control
task-level scheduling – or more specifically, the ability to control which
DAG runs to skip."

This makes it sound like you're talking about being able to have logic to
skip a dag.

If we're really talking about skipping dag runs, I suppose it depends what
the skip logic is.   If it's date based you could use the timetable
interface to do so, otherwise yeah it would have to be done from a task.  I
don't think of a maybe-skip task as causing that much overhead.

But maybe you're talking about having skip logic within each task that
isn't managed by a branch operator or similar.

It's fairly easy to achieve this already with a subclass.

class SkippyBashOperator(BashOperator):
    def execute(context):
        if certain_conditions:
            raise AirflowSkipException("I skipped")
        super().execute(context)

re

It should be evident from the task execution details why the task was
> skipped – the interface should provide the necessary string
> representation functionality.
>

i think that being able to put a `reason` in AirlfowSkipException and have
that be somehow visible in UI could be good. Also, being able to specify
`color` could also be nice.

re

> Main problem of such an approach (currently) is that there is no
> visual indication that a specific task was actually "skipped" rather
> than "executed"
>

The current UI _does_ already indicate when a task is skipped -- it's a
light pink color.  But maybe I misunderstand something.

I'm a bit skeptical that there would be a good interface for like a
task-level-timetable-override if that's where you're going -- like a way
for each schedule to have its own timetable override that's processed
within the dag run.

Or are you proposing that we allow tasks to exist and be schedulable
without being associated with a dag?


On Sun, Nov 7, 2021 at 2:07 PM Jarek Potiuk <[email protected]> wrote:

> I have mixed feelings about that one.
>
> I understand that sometimes you would like to skip processing of a
> single task using schedule "criteria", but IMHO that creates some
> ambiguities and the necessity of more complex logic for the remainder
> of DAGs. One thing is that you cannot really specify "output" for such
> a task. If it is skipped then it does not produce any output, so the
> logic of skipping previous tasks should be included in the following
> tasks.
>
> You really need to say "if another task was skipped, don't use it's
> output". Or similar. Another variant of this is "if the output does
> not exist", but that's a bit of implicit behaviour and does not play
> well with - some edge cases - for example - adding such conditional
> skip and back-filling history. If you base it on the existence of
> output backfilling will not work.
>
> So if you have tasks that pass any outputs, this approach might be
> quite problematic IMHO.
>
> Somehow I have a feeling that it's much easier to this kind of
> skipping using either the branch operator mentioned by Ash (then the
> output might be either "prepared' or "empty" depending on the branch)
> or even have custom PythonOperator/@task callable where you produce
> either "prepared" or "empty" output based on some time/cron logic.
> That somehow feels much more consistent and much more flexible, as you
> can base your decision on how you are running the task based on more
> criteria..
>
> Main problem of such an approach (currently) is that there is no
> visual indication that a specific task was actually "skipped" rather
> than "executed". This is true, but as I see it (and this is a much
> more generic approach) as an opportunity here to add such an indicator
> of task execution "flavour".
>
> When a task was executed it could be executed this or that way and
> mark it's status as such. This could be an icon, border, text color (I
> guess Brent can come with some ideas here) - some way to indicate that
> the task in this "dagrun" was run differently than in the "yesterdays"
> one. And it would not be limited to a "time schedule" difference only.
> It could be based on much more complex and different criteria. If we
> actually "execute" such a task rather than just "skip/run" we have the
> powers of custom Python code working for us.
>
> I can imagine very different "flavours" of execution:
>
> * based on time of day/week etc. (your case)
> * based on amount of data to process
> * based on number of errors/warnings encountered during processing
> * based on type of data seen
>
> Also the problem with "task-based schedule" is that due to the
> scheduler that cannot run any custom DAG-writer provided code, the
> flexibility of timeline logic is limited to whatever has been
> installed as a plugin by the admin. If we assume that "task execution"
> actually happens for such "conditional execution tasks", then we can
> run a code which has been written by the DAG writer - which adds to
> the flexibility of task execution logic and this flexibility is
> infinite.
>
> I feel that adding a timeline-only "flavour" of a run is very limiting
> and we can do better.
>
> But I am happy to discuss it stil.
>
> J.
>
> On Thu, Nov 4, 2021 at 12:04 PM Ash Berlin-Taylor <[email protected]> wrote:
> >
> > For context, the reason Malthe is proposing something like this, and
> doesn't want to use the "existing" approach of a BranchOperator or similar
> is optimization: Having to spin up a task to make a decision is, in many
> cases, not necessary and the scheduler could make this decision quickly.
> >
> > (This is along similar lines to why we no longer schedule or actually
> run DummyOperator but just mark it as success directly in the scheduler.)
> >
> > AIP-39 is a little unclear on how the new "logical_date" value changes
> with the different timetable implementations or if it's simply used
> internally for sorting purposes and not meaningful on its own. For this
> proposal to work, there has to be a well-defined "execution date" that we
> can compare against.
> >
> >
> >
> > data_interval_start and/or data_interval_end are the dates you should
> use for such a purpose
> >
> > Please don't use the term execution date -- it is too overloaded and
> confusing.
> >
> > -ash
> >
> >
> > On Mon, Oct 18 2021 at 21:17:22 +0000, Malthe <[email protected]> wrote:
> >
> > While AIP-39 provides an interface for more powerful pluggable
> scheduling behaviours, there is no such interface to control task-level
> scheduling – or more specifically, the ability to control which DAG runs to
> skip. Examples: - Skip task execution on certain days - Skip task execution
> on certain hours which could vary from day to day Whether or not child
> tasks would be affected by such task scheduling depends on the trigger rule
> configured on those tasks (e.g. "all_success", "all_done"). The interface
> might consist of both an include and exclude expression – by default all
> executions would be included and none excluded. In both cases, the
> scheduling could be a cron expression but the interface should again
> support more powerful behaviors. It should be evident from the task
> execution details why the task was skipped – the interface should provide
> the necessary string representation functionality. AIP-39 is a little
> unclear on how the new "logical_date" value changes with the different
> timetable implementations or if it's simply used internally for sorting
> purposes and not meaningful on its own. For this proposal to work, there
> has to be a well-defined "execution date" that we can compare against.
>

Reply via email to