COMMENT: While writing the answer here, I think I found a deeper problem (and optimisation needed) - i.e I think the delete should be even more fine-grained than it is today and include map_index) - please take a look at the end (Also maybe TP might comment on that one).
> 1. Additional indexes add additional performance degradation on Insert but > gain potential improvements on delete and unknown on update, RDBMS still > require rebalance index and make it consistent to the table. > 2. LIMIT x OFFSET y could easily become full seq scan, especially if the user > set a huge number for offset (which? unknown). > 3. Mixing two indexes could improve performance in a single query but in > concurrent execution might lead to degradation because it needs to create a > bitmap table for comparison between these two indexes, as result it might > lead different issues, such as OOM on DB backend, use swaps or optimiser > decided that better not to use this indexes. I think that is all something to be tested with explain plans. I think we would not know before we try - and possibly there are other optimisation approaches. The optimisation I proposed was only first that came to my mind to avoid the "not in" query. The problem with "not in query" is that there is no way to optimise it by the DB. Effectively you have to get every record (or index entry) and test it. Maybe it can be done better :). And yes locking the index with anti-insert locks and the need to rebalance trees during the delete is a concern. > Is it a real problem? Until we access only by indexes, which doesn't include > this JSON, it really doesn't matter. I guess we almost always should make a > UNIQUE INDEX SCAN for SELECT or DELETE (UPDATE) a single record. Yes I think so, and while. I was not the author of this "cleanup" code, I believe I know the intention. It's not about index size or JSON access. It is about the size of the actual rows and storage it takes - i.e. general size of the database. The problem with it is that (especially with dynamic task mapping), it might grow really, really fast. Basically you have NUM_DAGS x NUM_TASKS * NUM_MAP_INDEXES * NUM_TEMPLATED_FIELDS * NUM_RUNS number of records there. Back-of-the envelope calculation Assuming you have a DAG with 10 dynamically mapped tasks with 100 mapped indexes with 10 fields, each field evaluating to 1K string. Then you have 10 tasks * 100 map indexes * 10 fields * 1K rendered string size = 10MB to store per one(!) run of one(1) DAG. Run it every 10 minutes and every day your database from a single DAG grows by whooping 1.44 GB of data every single day (from single DAG).This is of course an estimation that assumes a lot, but it's not at all unrealistic. That's a lot. And if we want the user to do the cleanup then a) they need to know it b) they need to specifically clean up this table only because all the other data is relatively small. This table is very specific compared with the other tables. The only reason for it being here is to be able to show the rendered fields in the UI if you go to the specific run of a task. If you clean-up other tables you basically lose the history of execution of the tasks and you cannot really know if the data has been processed, you cannot do backfills effectively, you lose all the context. Cleaning this table is merely about the values that have been rendered for a specific run and the assumption there is that the older it gets, the less interesting it is. > It is opposite of what we have right now, we scan tables (maybe multiple > times), read all records tuples which contain JSON. Not sure if I get the point here :). Yes -in my proposal I think the records will not be touched - only indexes. So the cleanup should be way faster, contentions less of problem, due to the way the delete uses < ordering, deadlocks will not be possible at all (as opposed to the current "not in" - there is a very easy way to get into deadlocks when two parallel deletes are trying to delete same rows in a different sequence. I think my proposal improves all the characteristics of the "cleanup" with very little extra penalty on record creation. > We pay for table/index size linary more records, more size. But other > operations vary and depend on B-Tree implementation and usually it has > logarithmic growth. Or do we worry only about table/toast/index size on disk? Yep. I (and I believe the original author had the same worry) am worried a lot about the size of the table and the fact that this table will be by far the biggest table in our DB while most of the old records will never be touched. And by the fact that this is the only table that our users will have to know about to clean up separately from all others pretty much always. If not even worrying about money spent by our users, and performance degradation that comes with databases that are bigger - that's a lot of environmental effects that we might incur. Airflow is heavily used, if suddenly all our users will start having 10 bigger databases that they have now because we will deprecate the values and keep all the history, then we have a big number of extra disks that will have to be used. I'd strongly prefer a solution where we keep the data usage lower in this case. > If we do not want to grant users the ability to clean up rendered templates > tables, there could be another option: > - Do not delete records on every task instance run. > - Delete once per defined period (hourly, daily, weekly, monthly). In this > case you really could not care about locks. Yes we could come up with a different strategy as to "when" run the cleanup. This is also a viable option. If you can propose one that will be equally adaptive as the current solution, I am all ears. Basically my goal is to keep the usage of the table low, possibly controlled by the same parameter we had. How we do it - this is a different story. If we - for example add a thread in the scheduler (for example) that performs such cleanup effectively in parallel and scales, I am happy with that. But I am trying to get into the head of the author trying to understand why the original implementation was done this way. I believe (and maybe those who remember it better could confirm it) that distributing the deletion to tasks to clean up after itself is a better idea than centralising the cleanup. This makes each cleanup smaller, locks are held for a shorter time (at least that was the assumption where no full table scan was used), it is more "immediate" and you do not have to decide upfront what should be the cleanup frequency. It seems this is the best logical approach to keep the "MAX_NUM_RENDERED_TI_FIELDS_PER_TASK" promise. Simply after task is complete, you can be sure that there are no more than this number of fields per task in the DB. With a scheduled run, that would be a much more "eventual" consistency and it will be potentially fluctuating much more. But there are risks involved in having a single thread doing the cleanup. I think it has a huge risk of being a "stop-the world" and "deadlock-prone" kind of event - if in big instances there are a lot of rows to cleanup in a single pass. When you delete entries from a table, there are anti-insert locks created for existing index entries, which makes it possible to rollback the whole DELETE transaction. Which means that when you try to insert the row with the same index, the index will be held. And this means that when you run a single huge DELETE for multiple rows affected with multiple (all?) index keys matching select query, it will effectively prevent new rows with the same indexes that are matching the SELECT. It would mean that if you have some tasks running while deleting existing run_id rendered fields, then you could REALLY start having deadlocks on those tasks trying to insert rendered task instance rows. That's why I think the only viable strategy for single "cleanup" thread is to do such cleanup as separate DELETE for each of the "dag/task/map_index/run" - in order to avoid such deadlocks. Which effectively will turn into what have currently - only that currently those transactions are done by tasks, not by a single cleanup thread. Also using tasks to delete old rows is more "effective" when you have vast differences in frequency of DAGs. Naturally - when you do it in task, you will only do it "when needed" for given DAG + Task. If you try to centralize the cleanup, unless you include somehow schedule and frequency of each dag, you are going to check every DAG every time your run the cleanup - no matter if that DAG is run daily or every minute, you will have to run the cleanup frequently enough to match your most frequent dags. If you have 1000 dags that run hourly and one DAG that runs every minue, then you have to run a cleanup job that scans all DAGs every few minutes. That's a big waste. So I am not sure if we gain anything by centralizing the cleanup. Decentralising it to Task seems to be a well thought and sound decision (but I think the problem we have now is that we need to optimize it after Dynamic Task Mapping has been added). ANOTHER FINDING: While looking at the code and discussing it and looking more closely I **think** there is another problem that we have to fix regardless of a solution. I THINK a problem we might have now is that we do not include map_index in this DELETE. While we are curreently delete all the rendered task fields without including map_index - and for big dynamic tasks, it means that exacly the same DELETE query is run by every single mapped instance of that tasks and that is where a lot of contention and locking might happen (basically when single task instance does the delete, anti-insert locks held the other mapped instances of the same task from inserting rendered fields). It does not change much in the optimisation proposal of mine, other than we should include map_index in those queries. But I think this might cause a lot of delays in the current implementation. J. > ---- > Best Wishes > Andrey Anshin > > > > On Mon, 30 Jan 2023 at 23:51, Jarek Potiuk <[email protected]> wrote: >> >> I think there is a good reason to clean those up automatically. >> rendered task instance fields are almost arbitrary in size. If we try >> to keep all historical values there by default, there are numerous >> cases it will grow very fast - far, far too quickly. >> >> And I am not worried at all about locks on this table if we do it the >> way I described it and it uses the indexes. The contention this way >> might only be between the two deleting tasks. and with the query I >> proposed, they will only last for a short time - the index will be >> locked when two DELETES or SELECT DISTINCT - which should both be >> fast. >> >> >> On Mon, Jan 30, 2023 at 8:37 PM Andrey Anshin <[email protected]> >> wrote: >> > >> > I guess two things involved to reduce performance on this query through >> > the time: Dynamic Task Mapping and run_id instead of execution date. >> > >> > I still personally think that changing the default value from 30 to 0 >> > might improve performance of multiple concurrent tasks, just because this >> > query does not run and there are no locks on multiple records/pages. >> > >> > I do not have any proof (yet?) other than simple DAGs. I think that there >> > is some cross point exists when keeping this table growth worse rather >> > than cleanup for each TI run. But users have ability to cleanup table by >> > execute airflow db clean which should improve performance again >> > >> > And also there is interesting behavior with this query: if user already >> > have more that value specified by >> > AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK and tried run backfill >> > than rendered templates not written to table (or may be inserted and after >> > that immediately deleted), the same is valid for cleanup old tasks. >> > >> > ---- >> > Best Wishes >> > Andrey Anshin >> > >> > >> > >> > On Sun, 29 Jan 2023 at 14:16, Jarek Potiuk <[email protected]> wrote: >> >> >> >> Yep. Agree this is not an efficient query and dynamic task mapping >> >> makes the effect much worse. Generally speaking, selecting "what >> >> should be left" and then deleting stuff where the key is "not in" is >> >> never an efficient way of running an sql query. And the query not >> >> using index at all makes it rather terrible. >> >> >> >> I think we should not deprecate it though, but find a more efficient >> >> way of deleting the old keys. I think we could slightly denormalize >> >> RenderedTaskInstance + DagRun tables, and add DAG_RUN_EXECUTION_DATE >> >> to the RenderedTaskInstance table and that will be enough to optimise >> >> it. >> >> >> >> Then we could have either: >> >> >> >> * a composite B-TREE indexed (non-unique) index on DAG_ID, TASK_ID, >> >> RUN_ID_EXECUTION_DATE >> >> * or maybe even regular HASH index on DAG_ID, TASK_ID and separate >> >> B-TREE index (non-unique) on just RUN_ID_EXECUTION_DATE >> >> >> >> Probably the latter is better as I am not sure how < , > comparison >> >> looks like for composite B-TREE indexes when char + date columns are >> >> mixed. Also we could have hit the infamous MySQL index key length >> >> limit. >> >> >> >> Then deletion process would look roughly like: >> >> >> >> 1) dag_run_execution_date = SELECT RUN_ID_EXECUTION_DATE FROM >> >> RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>, >> >> TASK_ID=<TASK_ID> ORDER BY RUN_ID_EXECUTION_DATE GROUP BY >> >> RUN_ID_EXECUTION_DATE DESC LIMIT 1 OFFSET >> >> <MAX_NUM_RENDERED_TI_FIELDS_PER_TASK> >> >> 2) DELETE FROM RENDERED_TASK_INSTANCE_FIELDS WHERE DAG_ID =<DAG_ID>, >> >> TASK_ID=<TASK_ID> AND RENDER_TIME < dag_run_execution_date >> >> >> >> I believe that would be fast, and it would use the B-TREE index >> >> features nicely (ordering support) >> >> >> >> J >> >> >> >> On Sun, Jan 29, 2023 at 2:09 AM Andrey Anshin <[email protected]> >> >> wrote: >> >> > >> >> > First of all I want to highlight that this approach I guess worked well >> >> > until Dynamic Task Mappings introduced. >> >> > >> >> > > The main reason for adding that cleanup was -- if you don't do that, >> >> > > you will have many rows, similar to the TaskInstance table >> >> > >> >> > The problem itself is not how big your table/indexes, rather then what >> >> > kind of operation you run. >> >> > >> >> > > Do you have any data for locks or performance degradation? >> >> > >> >> > In this case if we try to clean up rendered_task_instance_fields table >> >> > when a new TI is created/cleared we make almost two full/sequential >> >> > scans (note: need to check) against the table without any index usage, >> >> > so we pay here a couple times: >> >> > 1. We scan without indexes - not all parts of the composite key are >> >> > included to query, plus we need to filter everything except 30 records >> >> > with order and distinct >> >> > 2. After that we make another full scan for find 1 record or map_size >> >> > records >> >> > >> >> > And I guess the situation becomes worse if you have a lot of tasks, >> >> > even if we have a small table, we need to do ineffective operations. >> >> > >> >> > That how looks like Query Plan (please note without commit transaction >> >> > DELETE operation doesn't have all information): >> >> > https://gist.github.com/Taragolis/3ca7621c51b00f077aa1646401ddf31b >> >> > >> >> > In case if we do not clean up the table, we only use these operations: >> >> > 1. SELECT single record by index >> >> > 2. INSERT new record >> >> > 3. DELETE old record(s), which were found by index. >> >> > >> >> > I have not done any real tests yet, only synthetic DAGs (so we should >> >> > not consider to use any findings as totally truth): >> >> > https://gist.github.com/Taragolis/6eec9f81efdf360c4239fc6ea385a480 >> >> > DAG with parallel tasks: degradation up to 2-3 times >> >> > DAG with single map tasks: degradation up to 7-10 times >> >> > >> >> > I have a plan for more complex and more close to real use cases with >> >> > Database which do not have network latency almost 0 as I have in my >> >> > local. >> >> > But I will not refuse if someone also does their tests with >> >> > AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK=0 vs default value. >> >> > >> >> > About deadlock we know that it exists at least in MySQL: >> >> > https://github.com/apache/airflow/pull/18616 >> >> > >> >> > > And the larger tables create problems during database migrations. >> >> > >> >> > That is a very good point, so if we found that problem only related to >> >> > migrations we could: >> >> > 1. Cleanup this table in migration >> >> > 2. Add cli command to airflow db which could cleanup only rendered >> >> > fields, so it would be user's choice cleanup or not before migration, >> >> > do periodical maintenance or not >> >> > >> >> > >> >> > ---- >> >> > Best Wishes >> >> > Andrey Anshin >> >> > >> >> > >> >> > >> >> > On Sat, 28 Jan 2023 at 23:41, Kaxil Naik <[email protected]> wrote: >> >> >>> >> >> >>> Potentially it is a good idea to deprecate this option and recommend >> >> >>> for users to set it to 0? WDYT? Maybe someone has already tried or >> >> >>> investigated this? >> >> >> >> >> >> >> >> >> The main reason for adding that cleanup was -- if you don't do that, >> >> >> you will have many rows, similar to the TaskInstance table. And the >> >> >> RenderedTIFields were mainly added for checking rendered TI fields on >> >> >> the Webserver only because after DAG Serialization, the webserver >> >> >> won't have access to DAG files. >> >> >> >> >> >> And the larger tables create problems during database migrations. >> >> >> >> >> >> Do you have any data for locks or performance degradation? >> >> >> >> >> >> >> >> >> >> >> >> On Sat, 28 Jan 2023 at 13:06, Andrey Anshin <[email protected]> >> >> >> wrote: >> >> >>> >> >> >>> Greetings! >> >> >>> >> >> >>> During migrating our ORM syntax to compatible with SQLAlchemy 2.0 I >> >> >>> probably found skeletons in the closet. >> >> >>> >> >> >>> Let's start from the beginning, initially I got this warning >> >> >>> >> >> >>> airflow/models/renderedtifields.py:245 RemovedIn20Warning('ORDER BY >> >> >>> columns added implicitly due to DISTINCT is deprecated and will be >> >> >>> removed in SQLAlchemy 2.0. SELECT statements with DISTINCT should be >> >> >>> written to explicitly include the appropriate columns in the columns >> >> >>> clause (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)') >> >> >>> >> >> >>> "OK let's fix it!", I thought at first and started to investigate >> >> >>> RenderedTaskInstanceFields model >> >> >>> >> >> >>> Skeleton #1: >> >> >>> >> >> >>> When I first time look on the code and comments it got me to thinking >> >> >>> that part which keep only latest N Rendered Task Fields potentially >> >> >>> could lead different performance degradation (Locks, Dead Locks, Data >> >> >>> Bloating): see code >> >> >>> https://github.com/apache/airflow/blob/6c479437b1aedf74d029463bda56b42950278287/airflow/models/renderedtifields.py#L185-L245 >> >> >>> >> >> >>> Also this historical part (from Airflow 1.10.10) generate this SQL >> >> >>> Statement (pg backend) >> >> >>> >> >> >>> DELETE FROM rendered_task_instance_fields >> >> >>> WHERE rendered_task_instance_fields.dag_id = %(dag_id_1) s >> >> >>> AND rendered_task_instance_fields.task_id = %(task_id_1) s >> >> >>> AND ( >> >> >>> ( >> >> >>> rendered_task_instance_fields.dag_id, >> >> >>> rendered_task_instance_fields.task_id, >> >> >>> rendered_task_instance_fields.run_id >> >> >>> ) NOT IN ( >> >> >>> SELECT >> >> >>> anon_1.dag_id, >> >> >>> anon_1.task_id, >> >> >>> anon_1.run_id >> >> >>> FROM >> >> >>> ( >> >> >>> SELECT DISTINCT >> >> >>> rendered_task_instance_fields.dag_id AS dag_id, >> >> >>> rendered_task_instance_fields.task_id AS task_id, >> >> >>> rendered_task_instance_fields.run_id AS run_id, >> >> >>> dag_run.execution_date AS execution_date >> >> >>> FROM rendered_task_instance_fields >> >> >>> JOIN dag_run ON rendered_task_instance_fields.dag_id = >> >> >>> dag_run.dag_id >> >> >>> AND rendered_task_instance_fields.run_id = dag_run.run_id >> >> >>> WHERE >> >> >>> rendered_task_instance_fields.dag_id = %(dag_id_2) s >> >> >>> AND rendered_task_instance_fields.task_id = %(task_id_2) s >> >> >>> ORDER BY >> >> >>> dag_run.execution_date DESC >> >> >>> limit %(param_1) s >> >> >>> ) AS anon_1 >> >> >>> ) >> >> >>> ) >> >> >>> >> >> >>> Which is especially not effective in PostgreSQL. When IN SUBQUERY >> >> >>> could be easily transform internaly into SEMI-JOIN (aka EXISTS >> >> >>> clause), but it is not working for NOT IN SUBQUERY because it is not >> >> >>> transformed into ANTI JOIN (aka NOT EXISTS clause) even if it >> >> >>> possible, see: https://commitfest.postgresql.org/27/2023/ >> >> >>> >> >> >>> I didn't do any performance benchmarks yet but I guess if users set >> >> >>> AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK to 0 rather than >> >> >>> default 30 it could improve performance and reduce number of >> >> >>> DeadLocks, however the table size will increase but I think we don't >> >> >>> do any maintenance job for other tables. >> >> >>> >> >> >>> Potentially it is a good idea to deprecate this option and recommend >> >> >>> for users to set it to 0? WDYT? Maybe someone has already tried or >> >> >>> investigated this? >> >> >>> >> >> >>> >> >> >>> Skeleton #2: >> >> >>> >> >> >>> We have a k8s_pod_yaml field which is exclusively used by K8S >> >> >>> executors. >> >> >>> >> >> >>> Should we also decouple this field as part of AIP-51? >> >> >>> >> >> >>> ---- >> >> >>> Best Wishes >> >> >>> Andrey Anshin >> >> >>>
