Hi Peter, Thanks for the detailed illustration. I understand your concern.
I believe the core question here is whether the index is used during job planning or at the scan task. This depends on how index files are referenced, at the file level or partition level. In my view, both approaches ultimately serve the same purpose. The main difference lies in how the index files are generated and split. For a partition-level index, would we generate a single index file per partition? If so, each update to the partition would require either rewriting a whole new index file, which could be costly at write time given that the index size grows along with data size, or appending a new index file per write operation, which would functionally be similar to a file-level index. And due to the potentially large size of index files, even one index file per partition may not support efficient planning in local planning mode. As a result, we’d likely need to split and read the index file in distributed planning mode, making it functionally equivalent to reading multiple index files at the task level. However the drawback of partition-level index is that the index must always be kept up to date to remain useful. Also, Steven, in my opinion, the value of an inverted index is not column pruning. It’s how it enables fast point lookups. As mentioned earlier, we just need to ensure that index lookups are faster than file scan with delete predicate evaluation. Let me know whether this addresses any of the concerns. Thanks, Xiaoxuan On Mon, May 12, 2025 at 4:34 PM Steven Wu <stevenz...@gmail.com> wrote: > agree with Peter that 1:1 mapping of data files and inverted indexes are > not as useful. With columnar format like Parquet, this can also be achieved > equivalently by reading the data file with projection on the identifier > columns. > > > > On Mon, May 12, 2025 at 4:20 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Xiaoxuan, >> >> Do we plan to store the indexes in a separate file alongside the data >> files? If so, then I have the following thoughts: >> - I agree that the 1-on-1 mapping of data files and index files is easy >> to maintain OTOH it is less useful as an index. >> - The writer (which is looking for a column with a specific primary key) >> needs to open all of the index files until it finds the given key. Since >> the index files are typically small, the cost here is O(n) where n is the >> number of the index files (equal to the number of the data files). >> - If we add a Bloom filter on the primary key to the data files, then >> the writer reads the footer of every data file and reads only the file >> which contains the primary key. This is also O(n) where the n is the number >> of the data files. >> >> So, IMHO having a 1-on-1 mapping between the data files and the index >> files is not too beneficial. >> >> OTOH if we maintain the index files on "index-partition" level, then the >> cost becomes O(p) where the p is the number of "index-partitions" which >> could be significantly lower. Notice that I used "index-partition". During >> the design we can decide to use the table partitioning for index >> partitioning as well. This requires us to use easily maintainable indexes. >> >> So while maintaining the index is more costly if it contains data from >> multiple data files, it also becomes much more useful. Maintenance >> procedures could build upon the data file immutability and simplify the >> index maintenance. >> >> Thanks Xiaoxuan for the good conversation! >> Peter >> >> >> Xiaoxuan Li <xiaoxuan.li....@gmail.com> ezt írta (időpont: 2025. máj. >> 10., Szo, 9:13): >> >>> Thanks Anton for the context and summary of the options, great to hear >>> that this direction aligns with earlier community discussions. And thanks >>> Gyula and Peter for the clear analysis. I agree with both of you, the index >>> needs to be designed and implemented efficiently in order to scale for >>> large data sets and streaming use cases. >>> >>> I wanted to share some context around why I proposed a file-level index. >>> >>> The benefit of a file-level index is that it naturally supports >>> partition pruning, file skipping and time travel. Issues like index >>> invalidation are inherently handled because the index is tied to the data >>> file’s lifecycle. Even if the index isn't always perfectly in sync, it's >>> still usable, and since its lifecycle is bound to the file, meaning >>> lifecycle management only needs to track with the file itself, reducing >>> complexity in index management and maintenance. >>> >>> On the other hand, partition/table level indexes must explicitly manage >>> synchronization with the table, handling updates/snapshots on the writer >>> side, and partition pruning, file skipping, time travel, and index >>> invalidation on the reader side. Since index files are immutable and grow >>> alongside data files, maintaining them can become as complex as managing >>> the entire table. It’s not surprising that Hudi uses a metadata table(which >>> is essentially a Hudi MOR table) to manage its index. Personally, I find >>> this approach less appealing, as it introduces a circular dependency that >>> adds architectural complexity. >>> >>> For the positional delete case, even if index lookups could be faster >>> than scanning files with predicates, it's still unclear whether that alone >>> is sufficient for streaming workloads, especially when many files are >>> involved. I think Peter’s idea of maintaining a hot cache of the index >>> within the streaming engine is promising. Alternatively, using an external >>> key-value store for fast lookups could also be explored. Would be great to >>> hear others’ thoughts on this. >>> >>> >>> Thanks, >>> >>> Xiaoxuan >>> >>> On Fri, May 9, 2025 at 8:12 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> When going through the options mentioned by Anton, I feel that Option 1 >>>> and 4 are just pushing the responsibility of converting the equality >>>> deletes to positional deletes to the engine side. The only difference is >>>> whether the conversion happens on the write side or on the read side. This >>>> is a step back, and doesn't help solving the problem for streaming engines. >>>> >>>> Option 2 needs a bit of thought. We need to find a good indexing >>>> strategy which allows big tables and plays nice with streaming writes too. >>>> It would be good to allow the engines to cache a limited part of the index, >>>> and distribute records to the writers in a way that the cache locality is >>>> considered. Also creating an index file for every data file might be >>>> suboptimal, as reading that many index files could tank performance. >>>> >>>> I have yet to see a good idea for Option 3. >>>> >>>> Another option could be to provide a way to convert the equality >>>> deletes to positional deletes as soon as possible. Maybe in frequent >>>> compaction tasks, or at first read? When we first apply an equality delete >>>> for a file, it is very easy to calculate the equivalent DV for the equality >>>> delete. Subsequent readers could depend on the DV instead of the equality >>>> delete. This could be the least disruptive change, but I see a few issues >>>> with this solution as well: >>>> - First read still depends on the equality delete which in edge cases >>>> could be very costly >>>> - Filters don't play well with this method >>>> >>>> All-in-all, I like the index solution (Option 2) for several reasons: >>>> - Cuts out the need for equality deletes which would reduce the code >>>> quite a bit >>>> - While slows down the write path a bit, the overall workflow >>>> (write+read) benefits from it >>>> - The index could be reused for other use-cases as well >>>> >>>> Thanks, >>>> Peter >>>> >>>> Gyula Fóra <gyula.f...@gmail.com> ezt írta (időpont: 2025. máj. 9., P, >>>> 12:16): >>>> >>>>> Hi Anton, >>>>> >>>>> Thank you for summarizing the options we see at this stage in a >>>>> structured and concise way. >>>>> >>>>> Based on the use-cases I see in the industry, I feel that not all of >>>>> the highlighted options are feasible (or desirable). >>>>> >>>>> >>>>> Option 4 would basically remove any possibilities for native streaming >>>>> CDC reads on tables, severely limiting how Iceberg can be used in the >>>>> future for real-time use-cases from Flink and other similar engines >>>>> that may want to connect. I understand that the view reconciliation >>>>> approach is implemented in Spark SQL already but implementing it in a >>>>> proper streaming way would likely lead to similar problems that we are >>>>> trying to solve here in the first place. >>>>> >>>>> >>>>> Regarding Option 1, introducing engine specific/custom indexing >>>>> solutions would go against the core design principles as it would be hard >>>>> to mix different engines when writing / reading tables for CDC use-cases. >>>>> (A streaming job would have a hard time writing upserts/equality deletes >>>>> to >>>>> tables that were written by a different engine). To me this sounds >>>>> very similar to Option 4 in a way that it pushes too much logic to the >>>>> engines in a way that would hurt the compatibility across the engines. >>>>> >>>>> >>>>> Based on this and the context of the discussion, Option 2 or a >>>>> combination of Option 2 & 3 sounds the most reasonable to me. There are >>>>> still a lot of questions on the practical implementation of the indexes >>>>> and >>>>> how we can do this efficiently so this is only a very early feedback from >>>>> my end. >>>>> >>>>> >>>>> Cheers, >>>>> >>>>> Gyula >>>>> >>>>> >>>>> >>>>> On Fri, May 9, 2025 at 12:14 AM Anton Okolnychyi < >>>>> aokolnyc...@gmail.com> wrote: >>>>> >>>>>> I am glad to see that folks are thinking about this problem. I am >>>>>> looking forward to a formal proposal/design doc to discuss details! >>>>>> >>>>>> Overall, this aligns with what we discussed in the >>>>>> community earlier w.r.t. the future of equality deletes and streaming >>>>>> upserts. If I were to summarize, we have the following options: >>>>>> >>>>>> Option 1: Add an inverted index (potentially distributed) maintained >>>>>> by an engine that does streaming writes to always produce DVs, even in >>>>>> streaming use cases. Deprecate/remove equality deletes from Iceberg. >>>>>> Option 2: Add native indexing to Iceberg so that the lookup of >>>>>> positions is quick and efficient enough to be used in streaming upserts. >>>>>> Deprecate/remove equality deletes from Iceberg. >>>>>> Option 3: Rethink equality deletes, potentially by introducing more >>>>>> restrictions and trying to scope them to particular data files, similar >>>>>> to >>>>>> DVs. >>>>>> Option 4: Standardize on a view reconciliation approach that Tabular >>>>>> implemented for CDC. >>>>>> >>>>>> I would like to highlight that what Spark does today during MERGE is >>>>>> similar to a lookup in an inverted index represented by another Iceberg >>>>>> table. That is OK for batch jobs but not enough for streaming. >>>>>> >>>>>> - Anton >>>>>> >>>>>> чт, 8 трав. 2025 р. о 10:08 Xiaoxuan Li <xiaoxuan.li....@gmail.com> >>>>>> пише: >>>>>> >>>>>>> Hi Zheng, Steven, Amogh and Gyula. Thank you all for the feedback! >>>>>>> >>>>>>> I agree with everyone, we need to narrow down the scope of this >>>>>>> optimization. The primary issue I'm trying to address is the slow read >>>>>>> performance caused by the growing number of equality delete >>>>>>> files(streaming >>>>>>> CDC scenarios). The other potential use cases are only mentioned to show >>>>>>> the extensibility of this approach. >>>>>>> >>>>>>> And both equality and positional deletes suffer from the same core >>>>>>> problem, records are evaluated repeatedly against multiple delete >>>>>>> predicates, at read time for equality deletes, and at write time for >>>>>>> positional deletes. This repeated evaluation is where the real >>>>>>> bottleneck >>>>>>> lies. >>>>>>> >>>>>>> It’s particularly bad for equality deletes, since we constantly >>>>>>> recompute row positions during reads without ever materializing them. >>>>>>> Eventually, a maintenance job is required just to rewrite them into >>>>>>> positional deletes. >>>>>>> >>>>>>> The idea is to eliminate this overhead by introducing an inverted >>>>>>> index that maps values (or value combinations) directly to row >>>>>>> positions. >>>>>>> This lets us skip full predicate evaluation and jump straight to the >>>>>>> affected rows, similar to how Hudi uses a record-level index for fast >>>>>>> upserts. If we can fetch row positions from the index, we can greatly >>>>>>> reduce overhead during reads (for equality deletes) and writes (for >>>>>>> positional deletes). >>>>>>> >>>>>>> In fact, if we can make positional deletes perform as well as >>>>>>> equality deletes using this index, we might be able to get rid of >>>>>>> equality >>>>>>> deletes, but that needs to be evaluated for the upsert case. >>>>>>> >>>>>>> That’s part of the reason why other types of indexes came up, >>>>>>> they’re applicable beyond just primary key columns, but CDC is the only >>>>>>> scenario with stricter SLA requirements. So it makes sense to align on >>>>>>> the >>>>>>> exact problem and use cases. For now, I think we can define our main >>>>>>> goal >>>>>>> as supporting inverted indexing over primary key columns to address the >>>>>>> slowness of reading caused by the growing number of equality delete >>>>>>> files. >>>>>>> >>>>>>> Thanks, Amogh, for bringing up the comparison with known >>>>>>> alternatives. We should include benchmarks for those as well, to >>>>>>> illustrate >>>>>>> the trade-offs in read/write performance, storage usage, and overall >>>>>>> cost. >>>>>>> >>>>>>> Really appreciate your feedback! I’ll incorporate these into the >>>>>>> next revision of the proposal. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Xiaoxuan >>>>>>> >>>>>>> On Thu, May 8, 2025 at 1:25 AM Gyula Fóra <gyula.f...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thank you for the proposal! >>>>>>>> >>>>>>>> I agree with what had been said above that we need to narrow down >>>>>>>> the scope here and what is the primary target for the optimization. >>>>>>>> >>>>>>>> As Amogh has also pointed out, CDC (streaming) read performance >>>>>>>> (with equality deletes) would be one of the biggest beneficiaries of >>>>>>>> this >>>>>>>> at a first glance. >>>>>>>> This is especially important for Flink users where this feature is >>>>>>>> currently completely missing and there is a big demand for it as we >>>>>>>> rely on >>>>>>>> equality deletes on the write path. [1] >>>>>>>> >>>>>>>> I am not aware of alternative proposals that would solve the >>>>>>>> equality delete cdc read performance question, overall I think using >>>>>>>> indices is reasonable and a very promising approach. >>>>>>>> >>>>>>>> Looking forward to more details and discussion! >>>>>>>> Gyula >>>>>>>> >>>>>>>> [1] >>>>>>>> https://lists.apache.org/thread/njmxjmjjm341fp4mgynn483v15mhk7qd >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 8, 2025 at 9:24 AM Amogh Jahagirdar <2am...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thank you for the proposal Xiaoxuan! I think I agree with Zheng >>>>>>>>> and Steven's point that it'll probably be more helpful to start out >>>>>>>>> with >>>>>>>>> more specific "what" and "why" (known areas of improvement for >>>>>>>>> Iceberg and >>>>>>>>> driven by any use cases) before we get too deep into the "how". >>>>>>>>> >>>>>>>>> In my mind, the specific known area of improvement for Iceberg >>>>>>>>> related to this proposal is improving streaming upsert behavior. One >>>>>>>>> area >>>>>>>>> this improvement is beneficial for is being able to provide better >>>>>>>>> data >>>>>>>>> freshness for Iceberg CDC mirror tables without the heavy read + >>>>>>>>> maintenance cost that currently exist with Flink upserts. >>>>>>>>> >>>>>>>>> As you mentioned, equality deletes have the benefit of being very >>>>>>>>> cheap to write but can come at a high and unpredictable cost at read >>>>>>>>> time. >>>>>>>>> Challenges with equality deletes have been discussed in the past [1]. >>>>>>>>> I'll also add that if one of the goals is to improving streaming >>>>>>>>> upserts (e.g. for applying CDC change streams into Iceberg mirror >>>>>>>>> tables), >>>>>>>>> then there are alternatives that I think we should compare against to >>>>>>>>> make >>>>>>>>> the tradeoffs clear. These alternatives include leveraging the >>>>>>>>> known changelog view or merge patterns [2] or improving the existing >>>>>>>>> maintenance procedures. >>>>>>>>> >>>>>>>>> I think the potential for being able to use a inverted index for >>>>>>>>> upsert cases to more directly identify positions in a file to directly >>>>>>>>> write DVs is very exciting, but before getting too far into the >>>>>>>>> weeds, I >>>>>>>>> think it'd first be helpful >>>>>>>>> to make sure we agree on the specific problem we're trying to >>>>>>>>> solve when we talk about performance improvements along with any use >>>>>>>>> cases, >>>>>>>>> followed by comparison with known alternatives (ideally we can get >>>>>>>>> numbers >>>>>>>>> that demonstrate the read/write/storage/cost tradeoffs for the >>>>>>>>> proposed >>>>>>>>> inverted index). >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://lists.apache.org/thread/z0gvco6hn2bpgngvk4h6xqrnw8b32sw6 >>>>>>>>> [2]https://www.tabular.io/blog/hello-world-of-cdc/ >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Amogh Jahagirdar >>>>>>>>> >>>>>>>>