Hi Peter, thanks for sharing the context around the Flink streaming use case and side note for concurrent write. Apologies for the delay as I just got back from a vacation. Yeah, I agree, having the index at the partition level is a better approach if we plan to use caching. As a distributed cache would introduce additional system overhead, and maintaining separate caches on each node could lead to redundant storage footprints.
But having one index file for the entire partition is not feasible because paying the cost of rewriting the index file for an entire partition on every write operation is expensive and could introduce significant write latency. If we want to use indexes at planning time, we would need a mapping from primary keys to data files and row positions for the whole partition. This can be constructed by aggregating file-level indexes at the planner. Even though each index file corresponds to a single data file, we can use a thread pool to load them in parallel. Compared to loading one large index file, the performance could be similar. Since the index files are already cached in memory or on disk, index files loading time becomes negligible. In fact, having one index file per data file might be advantageous, as it allows for incremental loading. One thing about file-level indexing is that planner or task node is not required to scan all index files upfront. Partition pruning and file filtering should occur prior to index access, allowing us to load only the relevant index files associated with the pruned data set. Another concern I have with the caching approach is determining which partitions to cache, since caching indexes for the entire table isn't practical. For time-based partitions, this might be more straightforward, for example, caching the most recent partitions. However, that's not always applicable for all use cases. If after prototyping, we find the solution isn’t performant enough for the streaming use case and we still want it to be handled, we could explore a hybrid approach of Option 2 and Option 3 that Anton mentioned. Also, thanks to Ismail for highlighting the BigQuery approach, that's helpful context! Thanks, Xiaoxuan On Wed, May 14, 2025 at 3:39 AM ismail simsek <ismailxsim...@gmail.com> wrote: > Hi All, Thank you for working on this. > > I wanted to share a reference to the BigQuery implementation > <https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness> > (Option 3) as another potential approach, and for inspiration. > In this setup, The engine is running periodic merge jobs and applying > equality deletes to the actual table, based on PK. and for some cases > applying it during runtime. > > > https://cloud.google.com/bigquery/docs/change-data-capture#query-max-staleness > > Best regards > ismail > > On Wed, May 14, 2025 at 7:37 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Xiaoxuan, >> >> Let me describe, how the Flink streaming writer uses equality deletes, >> and how it could use indexes. >> >> When the Flink streaming writer receives a new insert, then it appends >> the data to a data file. When it receives a delete, it appends the primary >> key to an equality delete file. When it receives an update the it creates >> both a data and a delete record. >> >> If we have an index available, the writer could rely on the index to get >> the position of the deleted record, and write a position delete instead of >> an equality delete. >> >> This index lookup is only effective, if we can keep the index in memory, >> and effectively distribute the incoming records between the writers so the >> index cache is used. In this case, the lookup cost becomes O(1) if we >> consider only the file access cost. If we need to read an index file for >> every data file we are adding O(n) delay on record level, where the n is >> the number of the data files in the table. >> >> I agree with you that the cost of rewriting the index file is not >> trivial, but that happens on commit level. This is much better than having >> an overhead on record level. >> >> > As a result, we’d likely need to split and read the index file in >> distributed planning mode >> >> That is why I was suggesting "index-partition"s. I don't have a ready >> answer for this, but making sure that the index split fits into the memory >> is important for Flink as well. >> >> > However the drawback of partition-level index is that the index must >> always be kept up to date to remain useful. >> >> I don't see how we can avoid that in case of Flink writers. The >> alternative is to read the non-indexed files in every writer, which seems >> like a no-go for me. >> >> > the value of an inverted index is not column pruning. It’s how it >> enables fast point lookups >> >> In my experience, the biggest gains are realized during planning, when >> the planner prunes whole files. While there is a possibility for a >> distributed planner to read fewer number of index files, I don't think it >> is possible for a planner to read the index files if they are stored for >> every data file. (Unless we are talking about a catalog which merges/caches >> the relevant part of the indexes. Sadly this is something which is >> available for the Flink writers) >> >> Side note: The index solution still prevents (or makes complicated) to >> handle concurrent writes to the table. Currently, if a concurrent writer >> updated the record, Flink just updated it again, and only a single record >> remains with the doubly updated primary key. With the index based solution, >> we might end up with duplicated keys. This might be an acceptable tradeoff, >> but we should be aware of it. >> >> Thanks for working on this Xiaoxuan! >> >> On Wed, May 14, 2025, 05:25 Xiaoxuan Li <xiaoxuan.li....@gmail.com> >> wrote: >> >>> Hi Peter, >>> >>> Thanks for the detailed illustration. I understand your concern. >>> >>> I believe the core question here is whether the index is used during job >>> planning or at the scan task. This depends on how index files are >>> referenced, at the file level or partition level. >>> >>> In my view, both approaches ultimately serve the same purpose. The main >>> difference lies in how the index files are generated and split. >>> >>> For a partition-level index, would we generate a single index file per >>> partition? If so, each update to the partition would require either >>> rewriting a whole new index file, which could be costly at write time given >>> that the index size grows along with data size, or appending a new index >>> file per write operation, which would functionally be similar to a >>> file-level index. >>> >>> And due to the potentially large size of index files, even one index >>> file per partition may not support efficient planning in local planning >>> mode. As a result, we’d likely need to split and read the index file in >>> distributed planning mode, making it functionally equivalent to reading >>> multiple index files at the task level. >>> >>> However the drawback of partition-level index is that the index must >>> always be kept up to date to remain useful. >>> >>> Also, Steven, in my opinion, the value of an inverted index is not >>> column pruning. It’s how it enables fast point lookups. As mentioned >>> earlier, we just need to ensure that index lookups are faster than file >>> scan with delete predicate evaluation. >>> >>> Let me know whether this addresses any of the concerns. >>> >>> >>> Thanks, >>> >>> Xiaoxuan >>> >>> On Mon, May 12, 2025 at 4:34 PM Steven Wu <stevenz...@gmail.com> wrote: >>> >>>> agree with Peter that 1:1 mapping of data files and inverted indexes >>>> are not as useful. With columnar format like Parquet, this can also be >>>> achieved equivalently by reading the data file with projection on the >>>> identifier columns. >>>> >>>> >>>> >>>> On Mon, May 12, 2025 at 4:20 AM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> Hi Xiaoxuan, >>>>> >>>>> Do we plan to store the indexes in a separate file alongside the data >>>>> files? If so, then I have the following thoughts: >>>>> - I agree that the 1-on-1 mapping of data files and index files is >>>>> easy to maintain OTOH it is less useful as an index. >>>>> - The writer (which is looking for a column with a specific primary >>>>> key) needs to open all of the index files until it finds the given key. >>>>> Since the index files are typically small, the cost here is O(n) where n >>>>> is >>>>> the number of the index files (equal to the number of the data files). >>>>> - If we add a Bloom filter on the primary key to the data files, then >>>>> the writer reads the footer of every data file and reads only the file >>>>> which contains the primary key. This is also O(n) where the n is the >>>>> number >>>>> of the data files. >>>>> >>>>> So, IMHO having a 1-on-1 mapping between the data files and the index >>>>> files is not too beneficial. >>>>> >>>>> OTOH if we maintain the index files on "index-partition" level, then >>>>> the cost becomes O(p) where the p is the number of "index-partitions" >>>>> which >>>>> could be significantly lower. Notice that I used "index-partition". During >>>>> the design we can decide to use the table partitioning for index >>>>> partitioning as well. This requires us to use easily maintainable indexes. >>>>> >>>>> So while maintaining the index is more costly if it contains data from >>>>> multiple data files, it also becomes much more useful. Maintenance >>>>> procedures could build upon the data file immutability and simplify the >>>>> index maintenance. >>>>> >>>>> Thanks Xiaoxuan for the good conversation! >>>>> Peter >>>>> >>>>> >>>>> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025. máj. >>>>> 10., Szo, 9:13): >>>>> >>>>>> Thanks Anton for the context and summary of the options, great to >>>>>> hear that this direction aligns with earlier community discussions. And >>>>>> thanks Gyula and Peter for the clear analysis. I agree with both of you, >>>>>> the index needs to be designed and implemented efficiently in order to >>>>>> scale for large data sets and streaming use cases. >>>>>> >>>>>> I wanted to share some context around why I proposed a file-level >>>>>> index. >>>>>> >>>>>> The benefit of a file-level index is that it naturally supports >>>>>> partition pruning, file skipping and time travel. Issues like index >>>>>> invalidation are inherently handled because the index is tied to the data >>>>>> file’s lifecycle. Even if the index isn't always perfectly in sync, it's >>>>>> still usable, and since its lifecycle is bound to the file, meaning >>>>>> lifecycle management only needs to track with the file itself, reducing >>>>>> complexity in index management and maintenance. >>>>>> >>>>>> On the other hand, partition/table level indexes must explicitly >>>>>> manage synchronization with the table, handling updates/snapshots on the >>>>>> writer side, and partition pruning, file skipping, time travel, and index >>>>>> invalidation on the reader side. Since index files are immutable and grow >>>>>> alongside data files, maintaining them can become as complex as managing >>>>>> the entire table. It’s not surprising that Hudi uses a metadata >>>>>> table(which >>>>>> is essentially a Hudi MOR table) to manage its index. Personally, I find >>>>>> this approach less appealing, as it introduces a circular dependency that >>>>>> adds architectural complexity. >>>>>> >>>>>> For the positional delete case, even if index lookups could be faster >>>>>> than scanning files with predicates, it's still unclear whether that >>>>>> alone >>>>>> is sufficient for streaming workloads, especially when many files are >>>>>> involved. I think Peter’s idea of maintaining a hot cache of the index >>>>>> within the streaming engine is promising. Alternatively, using an >>>>>> external >>>>>> key-value store for fast lookups could also be explored. Would be great >>>>>> to >>>>>> hear others’ thoughts on this. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Xiaoxuan >>>>>> >>>>>> On Fri, May 9, 2025 at 8:12 AM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> >>>>>>> When going through the options mentioned by Anton, I feel that >>>>>>> Option 1 and 4 are just pushing the responsibility of converting the >>>>>>> equality deletes to positional deletes to the engine side. The only >>>>>>> difference is whether the conversion happens on the write side or on the >>>>>>> read side. This is a step back, and doesn't help solving the problem for >>>>>>> streaming engines. >>>>>>> >>>>>>> Option 2 needs a bit of thought. We need to find a good indexing >>>>>>> strategy which allows big tables and plays nice with streaming writes >>>>>>> too. >>>>>>> It would be good to allow the engines to cache a limited part of the >>>>>>> index, >>>>>>> and distribute records to the writers in a way that the cache locality >>>>>>> is >>>>>>> considered. Also creating an index file for every data file might be >>>>>>> suboptimal, as reading that many index files could tank performance. >>>>>>> >>>>>>> I have yet to see a good idea for Option 3. >>>>>>> >>>>>>> Another option could be to provide a way to convert the equality >>>>>>> deletes to positional deletes as soon as possible. Maybe in frequent >>>>>>> compaction tasks, or at first read? When we first apply an equality >>>>>>> delete >>>>>>> for a file, it is very easy to calculate the equivalent DV for the >>>>>>> equality >>>>>>> delete. Subsequent readers could depend on the DV instead of the >>>>>>> equality >>>>>>> delete. This could be the least disruptive change, but I see a few >>>>>>> issues >>>>>>> with this solution as well: >>>>>>> - First read still depends on the equality delete which in edge >>>>>>> cases could be very costly >>>>>>> - Filters don't play well with this method >>>>>>> >>>>>>> All-in-all, I like the index solution (Option 2) for several reasons: >>>>>>> - Cuts out the need for equality deletes which would reduce the code >>>>>>> quite a bit >>>>>>> - While slows down the write path a bit, the overall workflow >>>>>>> (write+read) benefits from it >>>>>>> - The index could be reused for other use-cases as well >>>>>>> >>>>>>> Thanks, >>>>>>> Peter >>>>>>> >>>>>>> Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont: 2025. máj. 9., >>>>>>> P, 12:16): >>>>>>> >>>>>>>> Hi Anton, >>>>>>>> >>>>>>>> Thank you for summarizing the options we see at this stage in a >>>>>>>> structured and concise way. >>>>>>>> >>>>>>>> Based on the use-cases I see in the industry, I feel that not all >>>>>>>> of the highlighted options are feasible (or desirable). >>>>>>>> >>>>>>>> >>>>>>>> Option 4 would basically remove any possibilities for native >>>>>>>> streaming CDC reads on tables, severely limiting how Iceberg can be >>>>>>>> used in >>>>>>>> the future for real-time use-cases from Flink and other similar engines >>>>>>>> that may want to connect. I understand that the view reconciliation >>>>>>>> approach is implemented in Spark SQL already but implementing it in a >>>>>>>> proper streaming way would likely lead to similar problems that we are >>>>>>>> trying to solve here in the first place. >>>>>>>> >>>>>>>> >>>>>>>> Regarding Option 1, introducing engine specific/custom indexing >>>>>>>> solutions would go against the core design principles as it would be >>>>>>>> hard >>>>>>>> to mix different engines when writing / reading tables for CDC >>>>>>>> use-cases. >>>>>>>> (A streaming job would have a hard time writing upserts/equality >>>>>>>> deletes to >>>>>>>> tables that were written by a different engine). To me this sounds >>>>>>>> very similar to Option 4 in a way that it pushes too much logic to the >>>>>>>> engines in a way that would hurt the compatibility across the engines. >>>>>>>> >>>>>>>> >>>>>>>> Based on this and the context of the discussion, Option 2 or a >>>>>>>> combination of Option 2 & 3 sounds the most reasonable to me. There are >>>>>>>> still a lot of questions on the practical implementation of the >>>>>>>> indexes and >>>>>>>> how we can do this efficiently so this is only a very early feedback >>>>>>>> from >>>>>>>> my end. >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Gyula >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, May 9, 2025 at 12:14 AM Anton Okolnychyi < >>>>>>>> aokolnyc...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I am glad to see that folks are thinking about this problem. I am >>>>>>>>> looking forward to a formal proposal/design doc to discuss details! >>>>>>>>> >>>>>>>>> Overall, this aligns with what we discussed in the >>>>>>>>> community earlier w.r.t. the future of equality deletes and streaming >>>>>>>>> upserts. If I were to summarize, we have the following options: >>>>>>>>> >>>>>>>>> Option 1: Add an inverted index (potentially distributed) >>>>>>>>> maintained by an engine that does streaming writes to always produce >>>>>>>>> DVs, >>>>>>>>> even in streaming use cases. Deprecate/remove equality deletes from >>>>>>>>> Iceberg. >>>>>>>>> Option 2: Add native indexing to Iceberg so that the lookup of >>>>>>>>> positions is quick and efficient enough to be used in streaming >>>>>>>>> upserts. >>>>>>>>> Deprecate/remove equality deletes from Iceberg. >>>>>>>>> Option 3: Rethink equality deletes, potentially by introducing >>>>>>>>> more restrictions and trying to scope them to particular data files, >>>>>>>>> similar to DVs. >>>>>>>>> Option 4: Standardize on a view reconciliation approach >>>>>>>>> that Tabular implemented for CDC. >>>>>>>>> >>>>>>>>> I would like to highlight that what Spark does today during MERGE >>>>>>>>> is similar to a lookup in an inverted index represented by another >>>>>>>>> Iceberg >>>>>>>>> table. That is OK for batch jobs but not enough for streaming. >>>>>>>>> >>>>>>>>> - Anton >>>>>>>>> >>>>>>>>> чт, 8 трав. 2025 р. о 10:08 Xiaoxuan Li <xiaoxuan.li....@gmail.com> >>>>>>>>> пише: >>>>>>>>> >>>>>>>>>> Hi Zheng, Steven, Amogh and Gyula. Thank you all for the feedback! >>>>>>>>>> >>>>>>>>>> I agree with everyone, we need to narrow down the scope of this >>>>>>>>>> optimization. The primary issue I'm trying to address is the slow >>>>>>>>>> read >>>>>>>>>> performance caused by the growing number of equality delete >>>>>>>>>> files(streaming >>>>>>>>>> CDC scenarios). The other potential use cases are only mentioned to >>>>>>>>>> show >>>>>>>>>> the extensibility of this approach. >>>>>>>>>> >>>>>>>>>> And both equality and positional deletes suffer from the same >>>>>>>>>> core problem, records are evaluated repeatedly against multiple >>>>>>>>>> delete >>>>>>>>>> predicates, at read time for equality deletes, and at write time for >>>>>>>>>> positional deletes. This repeated evaluation is where the real >>>>>>>>>> bottleneck >>>>>>>>>> lies. >>>>>>>>>> >>>>>>>>>> It’s particularly bad for equality deletes, since we constantly >>>>>>>>>> recompute row positions during reads without ever materializing them. >>>>>>>>>> Eventually, a maintenance job is required just to rewrite them into >>>>>>>>>> positional deletes. >>>>>>>>>> >>>>>>>>>> The idea is to eliminate this overhead by introducing an inverted >>>>>>>>>> index that maps values (or value combinations) directly to row >>>>>>>>>> positions. >>>>>>>>>> This lets us skip full predicate evaluation and jump straight to the >>>>>>>>>> affected rows, similar to how Hudi uses a record-level index for fast >>>>>>>>>> upserts. If we can fetch row positions from the index, we can greatly >>>>>>>>>> reduce overhead during reads (for equality deletes) and writes (for >>>>>>>>>> positional deletes). >>>>>>>>>> >>>>>>>>>> In fact, if we can make positional deletes perform as well as >>>>>>>>>> equality deletes using this index, we might be able to get rid of >>>>>>>>>> equality >>>>>>>>>> deletes, but that needs to be evaluated for the upsert case. >>>>>>>>>> >>>>>>>>>> That’s part of the reason why other types of indexes came up, >>>>>>>>>> they’re applicable beyond just primary key columns, but CDC is the >>>>>>>>>> only >>>>>>>>>> scenario with stricter SLA requirements. So it makes sense to align >>>>>>>>>> on the >>>>>>>>>> exact problem and use cases. For now, I think we can define our main >>>>>>>>>> goal >>>>>>>>>> as supporting inverted indexing over primary key columns to address >>>>>>>>>> the >>>>>>>>>> slowness of reading caused by the growing number of equality delete >>>>>>>>>> files. >>>>>>>>>> >>>>>>>>>> Thanks, Amogh, for bringing up the comparison with known >>>>>>>>>> alternatives. We should include benchmarks for those as well, to >>>>>>>>>> illustrate >>>>>>>>>> the trade-offs in read/write performance, storage usage, and overall >>>>>>>>>> cost. >>>>>>>>>> >>>>>>>>>> Really appreciate your feedback! I’ll incorporate these into the >>>>>>>>>> next revision of the proposal. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Xiaoxuan >>>>>>>>>> >>>>>>>>>> On Thu, May 8, 2025 at 1:25 AM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thank you for the proposal! >>>>>>>>>>> >>>>>>>>>>> I agree with what had been said above that we need to narrow >>>>>>>>>>> down the scope here and what is the primary target for the >>>>>>>>>>> optimization. >>>>>>>>>>> >>>>>>>>>>> As Amogh has also pointed out, CDC (streaming) read performance >>>>>>>>>>> (with equality deletes) would be one of the biggest beneficiaries >>>>>>>>>>> of this >>>>>>>>>>> at a first glance. >>>>>>>>>>> This is especially important for Flink users where this feature >>>>>>>>>>> is currently completely missing and there is a big demand for it as >>>>>>>>>>> we rely >>>>>>>>>>> on equality deletes on the write path. [1] >>>>>>>>>>> >>>>>>>>>>> I am not aware of alternative proposals that would solve the >>>>>>>>>>> equality delete cdc read performance question, overall I think using >>>>>>>>>>> indices is reasonable and a very promising approach. >>>>>>>>>>> >>>>>>>>>>> Looking forward to more details and discussion! >>>>>>>>>>> Gyula >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://lists.apache.org/thread/njmxjmjjm341fp4mgynn483v15mhk7qd >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 8, 2025 at 9:24 AM Amogh Jahagirdar < >>>>>>>>>>> 2am...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thank you for the proposal Xiaoxuan! I think I agree with Zheng >>>>>>>>>>>> and Steven's point that it'll probably be more helpful to start >>>>>>>>>>>> out with >>>>>>>>>>>> more specific "what" and "why" (known areas of improvement for >>>>>>>>>>>> Iceberg and >>>>>>>>>>>> driven by any use cases) before we get too deep into the "how". >>>>>>>>>>>> >>>>>>>>>>>> In my mind, the specific known area of improvement for Iceberg >>>>>>>>>>>> related to this proposal is improving streaming upsert behavior. >>>>>>>>>>>> One area >>>>>>>>>>>> this improvement is beneficial for is being able to provide better >>>>>>>>>>>> data >>>>>>>>>>>> freshness for Iceberg CDC mirror tables without the heavy read + >>>>>>>>>>>> maintenance cost that currently exist with Flink upserts. >>>>>>>>>>>> >>>>>>>>>>>> As you mentioned, equality deletes have the benefit of being >>>>>>>>>>>> very cheap to write but can come at a high and unpredictable cost >>>>>>>>>>>> at read >>>>>>>>>>>> time. Challenges with equality deletes have been discussed in the >>>>>>>>>>>> past [1]. >>>>>>>>>>>> I'll also add that if one of the goals is to improving >>>>>>>>>>>> streaming upserts (e.g. for applying CDC change streams into >>>>>>>>>>>> Iceberg mirror >>>>>>>>>>>> tables), then there are alternatives that I think we should >>>>>>>>>>>> compare against >>>>>>>>>>>> to make >>>>>>>>>>>> the tradeoffs clear. These alternatives include leveraging the >>>>>>>>>>>> known changelog view or merge patterns [2] or improving the >>>>>>>>>>>> existing >>>>>>>>>>>> maintenance procedures. >>>>>>>>>>>> >>>>>>>>>>>> I think the potential for being able to use a inverted index >>>>>>>>>>>> for upsert cases to more directly identify positions in a file to >>>>>>>>>>>> directly >>>>>>>>>>>> write DVs is very exciting, but before getting too far into the >>>>>>>>>>>> weeds, I >>>>>>>>>>>> think it'd first be helpful >>>>>>>>>>>> to make sure we agree on the specific problem we're trying to >>>>>>>>>>>> solve when we talk about performance improvements along with any >>>>>>>>>>>> use cases, >>>>>>>>>>>> followed by comparison with known alternatives (ideally we can get >>>>>>>>>>>> numbers >>>>>>>>>>>> that demonstrate the read/write/storage/cost tradeoffs for the >>>>>>>>>>>> proposed >>>>>>>>>>>> inverted index). >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6 >>>>>>>>>>>> [2]https://www.tabular.io/blog/hello-world-of-cdc/ >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Amogh Jahagirdar >>>>>>>>>>>> >>>>>>>>>>>