I think marking a task as a success where it has not been executed is really an anti-pattern after thinking a bit. \
I really think that if tasks should be successful in some conditions where they should not actually "do stuff" is the logic of the task, not airflow behaviour around it. This does not add value for the user - mostly because you cannot distinguish the state in the UI. This will be confusing like crazy when you have DAGs which will look the same if the task was effectively "skipped" or "executed" - this is abusing rather than using Airflow. J. On Mon, Feb 14, 2022 at 2:11 AM Hongyi Wang <[email protected]> wrote: > > Hi Daniel and Jarek, > > After discussing with XD and the rest of our team members, we decided to move > forward with the `pre_execute` solution. Thank you @Deniel for raising the > idea. Thank you @Jarek for explaining the tradeoffs in detail. > > Besides "how to skip tasks", another thing I'd love to discuss is the "skip > state to set". > > | So the skipped task needs to have a State identical to "Success", other > than the name. Hence a new state may be needed > > When skipping a task, (in `pre_execute`) we can either choose to mark it as > "SUCCESS" or "SKIPPED". To differentiate skipped tasks from successful ones, > "SKIPPED" is preferred, however, it may not work well with some trigger_rules > like `all_success`. Therefore, I am thinking to introduce a new state e.g. > "SKIPPED_AS_SUCCESS", which is different from "SUCCESS", but will be treated > the same as "SUCCESS" when evaluating dependency rules. > > WYT? > > Howie > > On Sat, Feb 5, 2022 at 8:10 AM Jarek Potiuk <[email protected]> wrote: >> >> I thought a bit about it and there is one more thing that I do not really >> like about the approach when you specify the task to skip in the dag run >> conf. >> >> Currently the "logic" of the dag run conf is exclusively "dag specific" and >> only used at "task execution" time (mind - it is not used in "dag scheduling >> time" at all. Currently "Airflow Core" does not have to know anything about >> it and can transparently pass it to the task without parsing it. And it's >> the logic inside the DAG that should "understand" it. There is not a single >> other use case where dag_run conf is used and parsed by scheduler. >> Trying to make "dag_run.conf" to be used either in "Airflow Core" or in "DAG >> logic" depending on the parameter, is IMHO very wrong because it is mixing >> independent concerns. >> >> Currently things are simple: >> >> * dag structure is defined by DAG parsing (this is the only thing Scheduler >> cares) >> * task execution uses the "static" structure defined by DAG parsing and uses >> dag_run conf to implement logic to react to different parameters >> >> When you look at this from that perspective - the proposal is a weird >> mixture of both. And if we do it, in the future it **might** prevent us from >> doing a number of optimizations. For example dag_run_conf does not have to >> be parsed at all when the scheduler does its job. The only place dag_run >> conf is parsed is only at the moment the task is executed. >> >> For me this is a huge '-1' of the solution. >> >> Coming back XD to your example - I think what you are describing there is a >> particular DAG requirement, not something that Airflow core should take care >> about. Also - no matter how the task will be skipped (whether in pre-execute >> or by scheduler, if you want to "Skip" B2 but behave as if it "succeeded" >> you have exactly the same problems with triggering rules. It completely does >> not matter if you "skip" by throwing an exception or "Skip" by airflow. What >> you are really talking about it is not to "skip" certain tasks but rally >> about "pretending they succeeded". This is an entirely different thing than >> that we started the discussion with :). We were talking about "skipping" the >> task - which in Airflow is a different state than "Succeeded". >> >> And if you really want to "pretend success" you don't even have to (or even >> should) use 'pre_execute' for it. There are better ways already. >> >> If you are using the "modern" way of writing tasks (which I assume is the >> case for anyone who does custom work like that) I imagine your B2 task >> should be written this way (writing from memory so it might not compile): >> >> @task >> def task_b2(context: 'Context'): >> if context.dag_run.conf.get("should_task2_pretend_it_succeeded"): >> return >> # do stuff (remember to use hooks instead of the operators in the @task >> decorated tasks to use multiple Airflow providers) >> >> IMHO - this is sooooooo much more "Modern Airflow way" of approaching the >> problem: >> >> * it's very simple and follows the "modern" way of writing Airflow tasks >> (see also my talk about it from this week NYC Airflow Meetup >> https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA) >> * the logic of "pretending" is exactly there next to the code that >> otherwise should be run and it's very obvious what happens (otherwise you >> have to get a mental jump from DAG parameters passed through scheduler and >> task execution) >> * it's extremely explicit (https://www.python.org/dev/peps/pep-0020/) - >> "explicit is better than implicit" >> * dag_run configuration does not have to be understood by scheduler >> * your DAG structure does not have to be "complicated" as the task will end >> up in "success" state >> * this is imperative, not declarative, and DAG writer decides how "limited" >> the conditions are - Airflow does not limit you in any way and the condition >> can be arbitrary complex or simple (and take into account other parameters, >> dag_run, logical dates, actual dates of execution and basically anything >> else. >> >> From all the comments I saw - I am pretty convinced this is a much better >> way to approach it. >> >> BTW. There is indeed one "potential" pro of the "scheduler-based" skip >> calculation. It can provide a little optimization - the task does not need >> to be run at all to make the decision in this case (similarly what we do >> with DummyTasks now). But IMHO, this is totally offset by the fact that the >> scheduler would have to parse and analyse the dag_run conf to make the >> decisions - overall, it could be even slower in a number of cases. The >> scheduler loop is pretty critical and any extra logic and parsing there >> might have huge, unforeseen initial impact and we should only add any logic >> there if we are absolutely sure all the potential performance impact is well >> analyzed and understood. >> >> >> Giorgio Zoppi, >> >> XCom being "small" is also a thing of the past (if you use custom XCom >> backends) - see the same talk of mine >> https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA ) but also >> https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=custom%20xcom#custom-backends >> and this nice article from Astronomer >> https://www.astronomer.io/guides/custom-xcom-backends >> >> J. >> >> >> >> On Sat, Feb 5, 2022 at 4:22 PM Giorgio Zoppi <[email protected]> wrote: >>> >>> Hey Jurek, >>> Just a question about the future development, is the XComm backend >>> replaceable now? The real power of Airflow is the 'defacto' the glue >>> between different ways of mangling data, such as Python is the glue when >>> you need to implement things a lower level, ie. Altair Simulation software >>> is written for critical parts in C++ and binded at upper level in Python. >>> Same we can state for Tensorflow, numpy. About my question: it would be >>> nice to have as XComm backend a queue like redis or a non structured >>> database for allowing scaling but i am not sure >>> it it makes sense since XComm is just for short messages. >>> Best Regards, >>> Giorgio >>> >>> >>>
